1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.3 2001/03/04 13:01:55 oetiker
9 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
10 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
11 * This is backwards compatible! But new files using the Aberrant stuff are not readable
12 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
13 * -- Jake Brutlag <jakeb@corp.webtv.net>
14 *
15 * Revision 1.2 2001/03/04 11:14:25 oetiker
16 * added at-style-time@value:value syntax to rrd_update
17 * -- Dave Bodenstab <imdave@mcs.net>
18 *
19 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
20 * checkin
21 *
22 *****************************************************************************/
24 #include "rrd_tool.h"
25 #include <sys/types.h>
26 #include <fcntl.h>
28 #ifdef WIN32
29 #include <sys/locking.h>
30 #include <sys/stat.h>
31 #include <io.h>
32 #endif
34 /* Prototypes */
35 int LockRRD(FILE *rrd_file);
36 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
37 unsigned short CDP_scratch_idx, FILE *rrd_file);
39 /*#define DEBUG */
41 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
44 #ifdef STANDALONE
45 int
46 main(int argc, char **argv){
47 rrd_update(argc,argv);
48 if (rrd_test_error()) {
49 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
50 "Usage: rrdupdate filename\n"
51 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
52 "\t\t\ttime|N:value[:value...]\n\n"
53 "\t\t\tat-time@value[:value...]\n\n"
54 "\t\t\t[ time:value[:value...] ..]\n\n");
56 printf("ERROR: %s\n",rrd_get_error());
57 rrd_clear_error();
58 return 1;
59 }
60 return 0;
61 }
62 #endif
64 int
65 rrd_update(int argc, char **argv)
66 {
68 int arg_i = 2;
69 short j;
70 long i,ii,iii=1;
72 unsigned long rra_begin; /* byte pointer to the rra
73 * area in the rrd file. this
74 * pointer never changes value */
75 unsigned long rra_start; /* byte pointer to the rra
76 * area in the rrd file. this
77 * pointer changes as each rrd is
78 * processed. */
79 unsigned long rra_current; /* byte pointer to the current write
80 * spot in the rrd file. */
81 unsigned long rra_pos_tmp; /* temporary byte pointer. */
82 unsigned long interval,
83 pre_int,post_int; /* interval between this and
84 * the last run */
85 unsigned long proc_pdp_st; /* which pdp_st was the last
86 * to be processed */
87 unsigned long occu_pdp_st; /* when was the pdp_st
88 * before the last update
89 * time */
90 unsigned long proc_pdp_age; /* how old was the data in
91 * the pdp prep area when it
92 * was last updated */
93 unsigned long occu_pdp_age; /* how long ago was the last
94 * pdp_step time */
95 rrd_value_t *pdp_new; /* prepare the incoming data
96 * to be added the the
97 * existing entry */
98 rrd_value_t *pdp_temp; /* prepare the pdp values
99 * to be added the the
100 * cdp values */
102 long *tmpl_idx; /* index representing the settings
103 transported by the template index */
104 long tmpl_cnt = 2; /* time and data */
106 FILE *rrd_file;
107 rrd_t rrd;
108 time_t current_time = time(NULL);
109 char **updvals;
110 int schedule_smooth = 0;
111 char *template = NULL;
112 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
113 /* a vector of future Holt-Winters seasonal coefs */
114 unsigned long elapsed_pdp_st;
115 /* number of elapsed PDP steps since last update */
116 unsigned long *rra_step_cnt = NULL;
117 /* number of rows to be updated in an RRA for a data
118 * value. */
119 unsigned long start_pdp_offset;
120 /* number of PDP steps since the last update that
121 * are assigned to the first CDP to be generated
122 * since the last update. */
123 unsigned short scratch_idx;
124 /* index into the CDP scratch array */
125 enum cf_en current_cf;
126 /* numeric id of the current consolidation function */
128 while (1) {
129 static struct option long_options[] =
130 {
131 {"template", required_argument, 0, 't'},
132 {0,0,0,0}
133 };
134 int option_index = 0;
135 int opt;
136 opt = getopt_long(argc, argv, "t:",
137 long_options, &option_index);
139 if (opt == EOF)
140 break;
142 switch(opt) {
143 case 't':
144 template = optarg;
145 break;
147 case '?':
148 rrd_set_error("unknown option '%s'",argv[optind-1]);
149 rrd_free(&rrd);
150 return(-1);
151 }
152 }
154 /* need at least 2 arguments: filename, data. */
155 if (argc-optind < 2) {
156 rrd_set_error("Not enough arguments");
157 return -1;
158 }
160 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
161 return -1;
162 }
163 rra_current = rra_start = rra_begin = ftell(rrd_file);
164 /* This is defined in the ANSI C standard, section 7.9.5.3:
166 When a file is opened with udpate mode ('+' as the second
167 or third character in the ... list of mode argument
168 variables), both input and ouptut may be performed on the
169 associated stream. However, ... input may not be directly
170 followed by output without an intervening call to a file
171 positioning function, unless the input oepration encounters
172 end-of-file. */
173 fseek(rrd_file, 0, SEEK_CUR);
176 /* get exclusive lock to whole file.
177 * lock gets removed when we close the file.
178 */
179 if (LockRRD(rrd_file) != 0) {
180 rrd_set_error("could not lock RRD");
181 rrd_free(&rrd);
182 fclose(rrd_file);
183 return(-1);
184 }
186 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
187 rrd_set_error("allocating updvals pointer array");
188 rrd_free(&rrd);
189 fclose(rrd_file);
190 return(-1);
191 }
193 if ((pdp_temp = malloc(sizeof(rrd_value_t)
194 *rrd.stat_head->ds_cnt))==NULL){
195 rrd_set_error("allocating pdp_temp ...");
196 free(updvals);
197 rrd_free(&rrd);
198 fclose(rrd_file);
199 return(-1);
200 }
202 if ((tmpl_idx = malloc(sizeof(unsigned long)
203 *(rrd.stat_head->ds_cnt+1)))==NULL){
204 rrd_set_error("allocating tmpl_idx ...");
205 free(pdp_temp);
206 free(updvals);
207 rrd_free(&rrd);
208 fclose(rrd_file);
209 return(-1);
210 }
211 /* initialize template redirector */
212 /* default config
213 tmpl_idx[0] -> 0; (time)
214 tmpl_idx[1] -> 1; (DS 0)
215 tmpl_idx[2] -> 2; (DS 1)
216 tmpl_idx[3] -> 3; (DS 2)
217 ... */
218 for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
219 tmpl_cnt=rrd.stat_head->ds_cnt+1;
220 if (template) {
221 char *dsname;
222 int tmpl_len;
223 dsname = template;
224 tmpl_cnt = 1; /* the first entry is the time */
225 tmpl_len = strlen(template);
226 for(i=0;i<=tmpl_len ;i++) {
227 if (template[i] == ':' || template[i] == '\0') {
228 template[i] = '\0';
229 if (tmpl_cnt>rrd.stat_head->ds_cnt){
230 rrd_set_error("Template contains more DS definitions than RRD");
231 free(updvals); free(pdp_temp);
232 free(tmpl_idx); rrd_free(&rrd);
233 fclose(rrd_file); return(-1);
234 }
235 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
236 rrd_set_error("unknown DS name '%s'",dsname);
237 free(updvals); free(pdp_temp);
238 free(tmpl_idx); rrd_free(&rrd);
239 fclose(rrd_file); return(-1);
240 } else {
241 /* the first element is always the time */
242 tmpl_idx[tmpl_cnt-1]++;
243 /* go to the next entry on the template */
244 dsname = &template[i+1];
245 /* fix the damage we did before */
246 if (i<tmpl_len) {
247 template[i]=':';
248 }
250 }
251 }
252 }
253 }
254 if ((pdp_new = malloc(sizeof(rrd_value_t)
255 *rrd.stat_head->ds_cnt))==NULL){
256 rrd_set_error("allocating pdp_new ...");
257 free(updvals);
258 free(pdp_temp);
259 free(tmpl_idx);
260 rrd_free(&rrd);
261 fclose(rrd_file);
262 return(-1);
263 }
265 /* loop through the arguments. */
266 for(arg_i=optind+1; arg_i<argc;arg_i++) {
267 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
268 char *step_start = stepper;
269 char *p;
270 char *parsetime_error = NULL;
271 enum {atstyle, normal} timesyntax;
272 struct time_value ds_tv;
273 if (stepper == NULL){
274 rrd_set_error("failed duplication argv entry");
275 free(updvals);
276 free(pdp_temp);
277 free(tmpl_idx);
278 rrd_free(&rrd);
279 fclose(rrd_file);
280 return(-1);
281 }
282 /* initialize all ds input to unknown except the first one
283 which has always got to be set */
284 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
285 strcpy(stepper,argv[arg_i]);
286 updvals[0]=stepper;
287 /* separate all ds elements; first must be examined separately
288 due to alternate time syntax */
289 if ((p=strchr(stepper,'@'))!=NULL) {
290 timesyntax = atstyle;
291 *p = '\0';
292 stepper = p+1;
293 } else if ((p=strchr(stepper,':'))!=NULL) {
294 timesyntax = normal;
295 *p = '\0';
296 stepper = p+1;
297 } else {
298 rrd_set_error("expected timestamp not found in data source from %s:...",
299 argv[arg_i]);
300 free(step_start);
301 break;
302 }
303 ii=1;
304 updvals[tmpl_idx[ii]] = stepper;
305 while (*stepper) {
306 if (*stepper == ':') {
307 *stepper = '\0';
308 ii++;
309 if (ii<tmpl_cnt){
310 updvals[tmpl_idx[ii]] = stepper+1;
311 }
312 }
313 stepper++;
314 }
316 if (ii != tmpl_cnt-1) {
317 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
318 tmpl_cnt-1, ii, argv[arg_i]);
319 free(step_start);
320 break;
321 }
323 /* get the time from the reading ... handle N */
324 if (timesyntax == atstyle) {
325 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
326 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
327 free(step_start);
328 break;
329 }
330 if (ds_tv.type == RELATIVE_TO_END_TIME ||
331 ds_tv.type == RELATIVE_TO_START_TIME) {
332 rrd_set_error("specifying time relative to the 'start' "
333 "or 'end' makes no sense here: %s",
334 updvals[0]);
335 free(step_start);
336 break;
337 }
339 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
340 } else if (strcmp(updvals[0],"N")==0){
341 current_time = time(NULL);
342 } else {
343 current_time = atol(updvals[0]);
344 }
346 if(current_time <= rrd.live_head->last_up){
347 rrd_set_error("illegal attempt to update using time %ld when "
348 "last update time is %ld (minimum one second step)",
349 current_time, rrd.live_head->last_up);
350 free(step_start);
351 break;
352 }
355 /* seek to the beginning of the rra's */
356 if (rra_current != rra_begin) {
357 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
358 rrd_set_error("seek error in rrd");
359 free(step_start);
360 break;
361 }
362 rra_current = rra_begin;
363 }
364 rra_start = rra_begin;
366 /* when was the current pdp started */
367 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
368 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
370 /* when did the last pdp_st occur */
371 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
372 occu_pdp_st = current_time - occu_pdp_age;
373 interval = current_time - rrd.live_head->last_up;
375 if (occu_pdp_st > proc_pdp_st){
376 /* OK we passed the pdp_st moment*/
377 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
378 * occurred before the latest
379 * pdp_st moment*/
380 post_int = occu_pdp_age; /* how much after it */
381 } else {
382 pre_int = interval;
383 post_int = 0;
384 }
386 #ifdef DEBUG
387 printf(
388 "proc_pdp_age %lu\t"
389 "proc_pdp_st %lu\t"
390 "occu_pfp_age %lu\t"
391 "occu_pdp_st %lu\t"
392 "int %lu\t"
393 "pre_int %lu\t"
394 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
395 occu_pdp_age, occu_pdp_st,
396 interval, pre_int, post_int);
397 #endif
399 /* process the data sources and update the pdp_prep
400 * area accordingly */
401 for(i=0;i<rrd.stat_head->ds_cnt;i++){
402 enum dst_en dst_idx;
403 dst_idx= dst_conv(rrd.ds_def[i].dst);
404 if((updvals[i+1][0] != 'U') &&
405 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
406 double rate = DNAN;
407 /* the data source type defines how to process the data */
408 /* pdp_temp contains rate * time ... eg the bytes
409 * transferred during the interval. Doing it this way saves
410 * a lot of math operations */
413 switch(dst_idx){
414 case DST_COUNTER:
415 case DST_DERIVE:
416 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
417 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
418 if(dst_idx == DST_COUNTER) {
419 /* simple overflow catcher sugestet by andres kroonmaa */
420 /* this will fail terribly for non 32 or 64 bit counters ... */
421 /* are there any others in SNMP land ? */
422 if (pdp_new[i] < (double)0.0 )
423 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
424 if (pdp_new[i] < (double)0.0 )
425 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
426 }
427 rate = pdp_new[i] / interval;
428 }
429 else {
430 pdp_new[i]= DNAN;
431 }
432 break;
433 case DST_ABSOLUTE:
434 pdp_new[i]= atof(updvals[i+1]);
435 rate = pdp_new[i] / interval;
436 break;
437 case DST_GAUGE:
438 pdp_new[i] = atof(updvals[i+1]) * interval;
439 rate = pdp_new[i] / interval;
440 break;
441 default:
442 rrd_set_error("rrd contains unknown DS type : '%s'",
443 rrd.ds_def[i].dst);
444 break;
445 }
446 /* break out of this for loop if the error string is set */
447 if (rrd_test_error()){
448 break;
449 }
450 /* make sure pdp_temp is neither too large or too small
451 * if any of these occur it becomes unknown ...
452 * sorry folks ... */
453 if ( ! isnan(rate) &&
454 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
455 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
456 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
457 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
458 pdp_new[i] = DNAN;
459 }
460 } else {
461 /* no news is news all the same */
462 pdp_new[i] = DNAN;
463 }
465 /* make a copy of the command line argument for the next run */
466 #ifdef DEBUG
467 fprintf(stderr,
468 "prep ds[%lu]\t"
469 "last_arg '%s'\t"
470 "this_arg '%s'\t"
471 "pdp_new %10.2f\n",
472 i,
473 rrd.pdp_prep[i].last_ds,
474 updvals[i+1], pdp_new[i]);
475 #endif
476 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
477 strncpy(rrd.pdp_prep[i].last_ds,
478 updvals[i+1],LAST_DS_LEN-1);
479 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
480 }
481 }
482 /* break out of the argument parsing loop if the error_string is set */
483 if (rrd_test_error()){
484 free(step_start);
485 break;
486 }
487 /* has a pdp_st moment occurred since the last run ? */
489 if (proc_pdp_st == occu_pdp_st){
490 /* no we have not passed a pdp_st moment. therefore update is simple */
492 for(i=0;i<rrd.stat_head->ds_cnt;i++){
493 if(isnan(pdp_new[i]))
494 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
495 else
496 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
497 #ifdef DEBUG
498 fprintf(stderr,
499 "NO PDP ds[%lu]\t"
500 "value %10.2f\t"
501 "unkn_sec %5lu\n",
502 i,
503 rrd.pdp_prep[i].scratch[PDP_val].u_val,
504 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
505 #endif
506 }
507 } else {
508 /* an pdp_st has occurred. */
510 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
511 * occurred up to the last run.
512 pdp_new[] contains rate*seconds from the latest run.
513 pdp_temp[] will contain the rate for cdp */
516 for(i=0;i<rrd.stat_head->ds_cnt;i++){
517 /* update pdp_prep to the current pdp_st */
518 if(isnan(pdp_new[i]))
519 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
520 else
521 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
522 pdp_new[i]/(double)interval*(double)pre_int;
524 /* if too much of the pdp_prep is unknown we dump it */
525 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
526 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
527 (occu_pdp_st-proc_pdp_st <=
528 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
529 pdp_temp[i] = DNAN;
530 } else {
531 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
532 / (double)( occu_pdp_st
533 - proc_pdp_st
534 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
535 }
536 /* make pdp_prep ready for the next run */
537 if(isnan(pdp_new[i])){
538 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
539 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
540 } else {
541 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
542 rrd.pdp_prep[i].scratch[PDP_val].u_val =
543 pdp_new[i]/(double)interval*(double)post_int;
544 }
546 #ifdef DEBUG
547 fprintf(stderr,
548 "PDP UPD ds[%lu]\t"
549 "pdp_temp %10.2f\t"
550 "new_prep %10.2f\t"
551 "new_unkn_sec %5lu\n",
552 i, pdp_temp[i],
553 rrd.pdp_prep[i].scratch[PDP_val].u_val,
554 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
555 #endif
556 }
558 /* compute the number of elapsed pdp_st moments */
559 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
560 #ifdef DEBUG
561 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
562 #endif
563 if (rra_step_cnt == NULL)
564 {
565 rra_step_cnt = (unsigned long *)
566 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
567 }
569 for(i = 0, rra_start = rra_begin;
570 i < rrd.stat_head->rra_cnt;
571 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
572 i++)
573 {
574 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
575 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
576 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
577 if (start_pdp_offset <= elapsed_pdp_st) {
578 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
579 rrd.rra_def[i].pdp_cnt + 1;
580 } else {
581 rra_step_cnt[i] = 0;
582 }
584 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
585 {
586 /* If this is a bulk update, we need to skip ahead in the seasonal
587 * arrays so that they will be correct for the next observed value;
588 * note that for the bulk update itself, no update will occur to
589 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
590 * be set to DNAN. */
591 if (rra_step_cnt[i] > 2)
592 {
593 /* skip update by resetting rra_step_cnt[i],
594 * note that this is not data source specific; this is due
595 * to the bulk update, not a DNAN value for the specific data
596 * source. */
597 rra_step_cnt[i] = 0;
598 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
599 &last_seasonal_coef);
600 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
601 &seasonal_coef);
602 }
604 /* periodically run a smoother for seasonal effects */
605 /* Need to use first cdp parameter buffer to track
606 * burnin (burnin requires a specific smoothing schedule).
607 * The CDP_init_seasonal parameter is really an RRA level,
608 * not a data source within RRA level parameter, but the rra_def
609 * is read only for rrd_update (not flushed to disk). */
610 iii = i*(rrd.stat_head -> ds_cnt);
611 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
612 <= BURNIN_CYCLES)
613 {
614 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
615 > rrd.rra_def[i].row_cnt - 1) {
616 /* mark off one of the burnin cycles */
617 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
618 schedule_smooth = 1;
619 }
620 } else {
621 /* someone has no doubt invented a trick to deal with this
622 * wrap around, but at least this code is clear. */
623 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
624 rrd.rra_ptr[i].cur_row)
625 {
626 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
627 * mapping between PDP and CDP */
628 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
629 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
630 {
631 #ifdef DEBUG
632 fprintf(stderr,
633 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
634 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
635 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
636 #endif
637 schedule_smooth = 1;
638 }
639 } else {
640 /* can't rely on negative numbers because we are working with
641 * unsigned values */
642 /* Don't need modulus here. If we've wrapped more than once, only
643 * one smooth is executed at the end. */
644 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
645 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
646 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
647 {
648 #ifdef DEBUG
649 fprintf(stderr,
650 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
651 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
652 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
653 #endif
654 schedule_smooth = 1;
655 }
656 }
657 }
659 rra_current = ftell(rrd_file);
660 } /* if cf is DEVSEASONAL or SEASONAL */
662 if (rrd_test_error()) break;
664 /* update CDP_PREP areas */
665 /* loop over data soures within each RRA */
666 for(ii = 0;
667 ii < rrd.stat_head->ds_cnt;
668 ii++)
669 {
671 /* iii indexes the CDP prep area for this data source within the RRA */
672 iii=i*rrd.stat_head->ds_cnt+ii;
674 if (rrd.rra_def[i].pdp_cnt > 1) {
676 if (rra_step_cnt[i] > 0) {
677 /* If we are in this block, as least 1 CDP value will be written to
678 * disk, this is the CDP_primary_val entry. If more than 1 value needs
679 * to be written, then the "fill in" value is the CDP_secondary_val
680 * entry. */
681 if (isnan(pdp_temp[ii]))
682 {
683 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
684 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
685 } else {
686 /* CDP_secondary value is the RRA "fill in" value for intermediary
687 * CDP data entries. No matter the CF, the value is the same because
688 * the average, max, min, and last of a list of identical values is
689 * the same, namely, the value itself. */
690 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
691 }
693 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
694 > rrd.rra_def[i].pdp_cnt*
695 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
696 {
697 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
698 /* initialize carry over */
699 if (current_cf == CF_AVERAGE) {
700 if (isnan(pdp_temp[ii])) {
701 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
702 } else {
703 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
704 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
705 }
706 } else {
707 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
708 }
709 } else {
710 rrd_value_t cum_val, cur_val;
711 switch (current_cf) {
712 case CF_AVERAGE:
713 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
714 cur_val = IFDNAN(pdp_temp[ii],0.0);
715 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
716 (cum_val + cur_val) /
717 (rrd.rra_def[i].pdp_cnt
718 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
719 /* initialize carry over value */
720 if (isnan(pdp_temp[ii])) {
721 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
722 } else {
723 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
724 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
725 }
726 break;
727 case CF_MAXIMUM:
728 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
729 cur_val = IFDNAN(pdp_temp[ii],-DINF);
730 #ifdef DEBUG
731 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
732 isnan(pdp_temp[ii])) {
733 fprintf(stderr,
734 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
735 i,ii);
736 exit(-1);
737 }
738 #endif
739 if (cur_val > cum_val)
740 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
741 else
742 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
743 /* initialize carry over value */
744 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
745 break;
746 case CF_MINIMUM:
747 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
748 cur_val = IFDNAN(pdp_temp[ii],DINF);
749 #ifdef DEBUG
750 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
751 isnan(pdp_temp[ii])) {
752 fprintf(stderr,
753 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
754 i,ii);
755 exit(-1);
756 }
757 #endif
758 if (cur_val < cum_val)
759 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
760 else
761 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
762 /* initialize carry over value */
763 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
764 break;
765 case CF_LAST:
766 default:
767 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
768 /* initialize carry over value */
769 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
770 break;
771 }
772 } /* endif meets xff value requirement for a valid value */
773 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
774 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
775 if (isnan(pdp_temp[ii]))
776 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
777 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
778 else
779 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
780 } else /* rra_step_cnt[i] == 0 */
781 {
782 #ifdef DEBUG
783 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
784 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
785 i,ii);
786 } else {
787 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
788 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
789 }
790 #endif
791 if (isnan(pdp_temp[ii])) {
792 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
793 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
794 {
795 if (current_cf == CF_AVERAGE) {
796 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
797 elapsed_pdp_st;
798 } else {
799 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
800 }
801 #ifdef DEBUG
802 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
803 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
804 #endif
805 } else {
806 switch (current_cf) {
807 case CF_AVERAGE:
808 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
809 elapsed_pdp_st;
810 break;
811 case CF_MINIMUM:
812 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
813 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
814 break;
815 case CF_MAXIMUM:
816 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
817 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
818 break;
819 case CF_LAST:
820 default:
821 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
822 break;
823 }
824 }
825 }
826 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
827 if (elapsed_pdp_st > 2)
828 {
829 switch (current_cf) {
830 case CF_AVERAGE:
831 default:
832 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
833 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
834 break;
835 case CF_SEASONAL:
836 case CF_DEVSEASONAL:
837 /* need to update cached seasonal values, so they are consistent
838 * with the bulk update */
839 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
840 * CDP_last_deviation are the same. */
841 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
842 last_seasonal_coef[ii];
843 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
844 seasonal_coef[ii];
845 break;
846 case CF_HWPREDICT:
847 /* need to update the null_count and last_null_count.
848 * even do this for non-DNAN pdp_temp because the
849 * algorithm is not learning from batch updates. */
850 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
851 elapsed_pdp_st;
852 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
853 elapsed_pdp_st - 1;
854 /* fall through */
855 case CF_DEVPREDICT:
856 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
857 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
858 break;
859 case CF_FAILURES:
860 /* do not count missed bulk values as failures */
861 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
862 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
863 /* need to reset violations buffer.
864 * could do this more carefully, but for now, just
865 * assume a bulk update wipes away all violations. */
866 erase_violations(&rrd, iii, i);
867 break;
868 }
869 }
870 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
872 if (rrd_test_error()) break;
874 } /* endif data sources loop */
875 } /* end RRA Loop */
877 /* this loop is only entered if elapsed_pdp_st < 3 */
878 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
879 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
880 {
881 for(i = 0, rra_start = rra_begin;
882 i < rrd.stat_head->rra_cnt;
883 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
884 i++)
885 {
886 if (rrd.rra_def[i].pdp_cnt > 1) continue;
888 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
889 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
890 {
891 lookup_seasonal(&rrd,i,rra_start,rrd_file,
892 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
893 &seasonal_coef);
894 }
895 if (rrd_test_error()) break;
896 /* loop over data soures within each RRA */
897 for(ii = 0;
898 ii < rrd.stat_head->ds_cnt;
899 ii++)
900 {
901 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
902 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
903 scratch_idx, seasonal_coef);
904 }
905 } /* end RRA Loop */
906 if (rrd_test_error()) break;
907 } /* end elapsed_pdp_st loop */
909 if (rrd_test_error()) break;
911 /* Ready to write to disk */
912 /* Move sequentially through the file, writing one RRA at a time.
913 * Note this architecture divorces the computation of CDP with
914 * flushing updated RRA entries to disk. */
915 for(i = 0, rra_start = rra_begin;
916 i < rrd.stat_head->rra_cnt;
917 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
918 i++) {
919 /* is there anything to write for this RRA? If not, continue. */
920 if (rra_step_cnt[i] == 0) continue;
922 /* write the first row */
923 #ifdef DEBUG
924 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
925 #endif
926 rrd.rra_ptr[i].cur_row++;
927 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
928 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
929 /* positition on the first row */
930 rra_pos_tmp = rra_start +
931 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
932 if(rra_pos_tmp != rra_current) {
933 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
934 rrd_set_error("seek error in rrd");
935 break;
936 }
937 rra_current = rra_pos_tmp;
938 }
939 #ifdef DEBUG
940 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
941 #endif
942 scratch_idx = CDP_primary_val;
943 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
944 if (rrd_test_error()) break;
946 /* write other rows of the bulk update, if any */
947 scratch_idx = CDP_secondary_val;
948 for ( ; rra_step_cnt[i] > 1;
949 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
950 {
951 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
952 {
953 #ifdef DEBUG
954 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
955 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
956 #endif
957 /* wrap */
958 rrd.rra_ptr[i].cur_row = 0;
959 /* seek back to beginning of current rra */
960 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
961 {
962 rrd_set_error("seek error in rrd");
963 break;
964 }
965 #ifdef DEBUG
966 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
967 #endif
968 rra_current = rra_start;
969 }
970 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
971 }
973 if (rrd_test_error())
974 break;
975 } /* RRA LOOP */
977 /* break out of the argument parsing loop if error_string is set */
978 if (rrd_test_error()){
979 free(step_start);
980 break;
981 }
983 } /* endif a pdp_st has occurred */
984 rrd.live_head->last_up = current_time;
985 free(step_start);
986 } /* function argument loop */
988 if (seasonal_coef != NULL) free(seasonal_coef);
989 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
990 if (rra_step_cnt != NULL) free(rra_step_cnt);
992 /* if we got here and if there is an error and if the file has not been
993 * written to, then close things up and return. */
994 if (rrd_test_error()) {
995 free(updvals);
996 free(tmpl_idx);
997 rrd_free(&rrd);
998 free(pdp_temp);
999 free(pdp_new);
1000 fclose(rrd_file);
1001 return(-1);
1002 }
1004 /* aargh ... that was tough ... so many loops ... anyway, its done.
1005 * we just need to write back the live header portion now*/
1007 if (fseek(rrd_file, (sizeof(stat_head_t)
1008 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1009 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1010 SEEK_SET) != 0) {
1011 rrd_set_error("seek rrd for live header writeback");
1012 free(updvals);
1013 free(tmpl_idx);
1014 rrd_free(&rrd);
1015 free(pdp_temp);
1016 free(pdp_new);
1017 fclose(rrd_file);
1018 return(-1);
1019 }
1021 if(fwrite( rrd.live_head,
1022 sizeof(live_head_t), 1, rrd_file) != 1){
1023 rrd_set_error("fwrite live_head to rrd");
1024 free(updvals);
1025 rrd_free(&rrd);
1026 free(tmpl_idx);
1027 free(pdp_temp);
1028 free(pdp_new);
1029 fclose(rrd_file);
1030 return(-1);
1031 }
1033 if(fwrite( rrd.pdp_prep,
1034 sizeof(pdp_prep_t),
1035 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1036 rrd_set_error("ftwrite pdp_prep to rrd");
1037 free(updvals);
1038 rrd_free(&rrd);
1039 free(tmpl_idx);
1040 free(pdp_temp);
1041 free(pdp_new);
1042 fclose(rrd_file);
1043 return(-1);
1044 }
1046 if(fwrite( rrd.cdp_prep,
1047 sizeof(cdp_prep_t),
1048 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1049 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1051 rrd_set_error("ftwrite cdp_prep to rrd");
1052 free(updvals);
1053 free(tmpl_idx);
1054 rrd_free(&rrd);
1055 free(pdp_temp);
1056 free(pdp_new);
1057 fclose(rrd_file);
1058 return(-1);
1059 }
1061 if(fwrite( rrd.rra_ptr,
1062 sizeof(rra_ptr_t),
1063 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1064 rrd_set_error("fwrite rra_ptr to rrd");
1065 free(updvals);
1066 free(tmpl_idx);
1067 rrd_free(&rrd);
1068 free(pdp_temp);
1069 free(pdp_new);
1070 fclose(rrd_file);
1071 return(-1);
1072 }
1074 /* OK now close the files and free the memory */
1075 if(fclose(rrd_file) != 0){
1076 rrd_set_error("closing rrd");
1077 free(updvals);
1078 free(tmpl_idx);
1079 rrd_free(&rrd);
1080 free(pdp_temp);
1081 free(pdp_new);
1082 return(-1);
1083 }
1085 /* calling the smoothing code here guarantees at most
1086 * one smoothing operation per rrd_update call. Unfortunately,
1087 * it is possible with bulk updates, or a long-delayed update
1088 * for smoothing to occur off-schedule. This really isn't
1089 * critical except during the burning cycles. */
1090 if (schedule_smooth)
1091 {
1092 #ifndef WIN32
1093 rrd_file = fopen(argv[optind],"r+");
1094 #else
1095 rrd_file = fopen(argv[optind],"rb+");
1096 #endif
1097 rra_start = rra_begin;
1098 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1099 {
1100 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1101 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1102 {
1103 #ifdef DEBUG
1104 fprintf(stderr,"Running smoother for rra %ld\n",i);
1105 #endif
1106 apply_smoother(&rrd,i,rra_start,rrd_file);
1107 if (rrd_test_error())
1108 break;
1109 }
1110 rra_start += rrd.rra_def[i].row_cnt
1111 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1112 }
1113 fclose(rrd_file);
1114 }
1115 rrd_free(&rrd);
1116 free(updvals);
1117 free(tmpl_idx);
1118 free(pdp_new);
1119 free(pdp_temp);
1120 return(0);
1121 }
1123 /*
1124 * get exclusive lock to whole file.
1125 * lock gets removed when we close the file
1126 *
1127 * returns 0 on success
1128 */
1129 int
1130 LockRRD(FILE *rrdfile)
1131 {
1132 int rrd_fd; /* File descriptor for RRD */
1133 int stat;
1135 rrd_fd = fileno(rrdfile);
1137 {
1138 #ifndef WIN32
1139 struct flock lock;
1140 lock.l_type = F_WRLCK; /* exclusive write lock */
1141 lock.l_len = 0; /* whole file */
1142 lock.l_start = 0; /* start of file */
1143 lock.l_whence = SEEK_SET; /* end of file */
1145 stat = fcntl(rrd_fd, F_SETLK, &lock);
1146 #else
1147 struct _stat st;
1149 if ( _fstat( rrd_fd, &st ) == 0 ) {
1150 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1151 } else {
1152 stat = -1;
1153 }
1154 #endif
1155 }
1157 return(stat);
1158 }
1161 void
1162 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1163 unsigned short CDP_scratch_idx, FILE *rrd_file)
1164 {
1165 unsigned long ds_idx, cdp_idx;
1167 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1168 {
1169 /* compute the cdp index */
1170 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1171 #ifdef DEBUG
1172 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1173 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1174 rrd -> rra_def[rra_idx].cf_nam);
1175 #endif
1177 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1178 sizeof(rrd_value_t),1,rrd_file) != 1)
1179 {
1180 rrd_set_error("writing rrd");
1181 return;
1182 }
1183 *rra_current += sizeof(rrd_value_t);
1184 }
1185 }