1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.3 2001/03/04 13:01:56 oetiker
9 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
10 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
11 * This is backwards compatible! But new files using the Aberrant stuff are not readable
12 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
13 * -- Jake Brutlag <jakeb@corp.webtv.net>
14 *
15 * Revision 1.2 2001/03/04 11:14:25 oetiker
16 * added at-style-time@value:value syntax to rrd_update
17 * -- Dave Bodenstab <imdave@mcs.net>
18 * Revision 1.1 2001/02/25 22:25:06 oetiker
19 * Initial revision
20 *
21 *****************************************************************************/
23 #include "rrd_tool.h"
24 #include <sys/types.h>
25 #include <fcntl.h>
27 #ifdef WIN32
28 #include <sys/locking.h>
29 #include <sys/stat.h>
30 #include <io.h>
31 #endif
33 /* Prototypes */
34 int LockRRD(FILE *rrd_file);
35 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
36 unsigned short CDP_scratch_idx, FILE *rrd_file);
38 /*#define DEBUG */
40 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
43 #ifdef STANDALONE
44 int
45 main(int argc, char **argv){
46 rrd_update(argc,argv);
47 if (rrd_test_error()) {
48 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
49 "Usage: rrdupdate filename\n"
50 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
51 "\t\t\ttime|N:value[:value...]\n\n"
52 "\t\t\tat-time@value[:value...]\n\n"
53 "\t\t\t[ time:value[:value...] ..]\n\n");
55 printf("ERROR: %s\n",rrd_get_error());
56 rrd_clear_error();
57 return 1;
58 }
59 return 0;
60 }
61 #endif
63 int
64 rrd_update(int argc, char **argv)
65 {
67 int arg_i = 2;
68 short j;
69 long i,ii,iii=1;
71 unsigned long rra_begin; /* byte pointer to the rra
72 * area in the rrd file. this
73 * pointer never changes value */
74 unsigned long rra_start; /* byte pointer to the rra
75 * area in the rrd file. this
76 * pointer changes as each rrd is
77 * processed. */
78 unsigned long rra_current; /* byte pointer to the current write
79 * spot in the rrd file. */
80 unsigned long rra_pos_tmp; /* temporary byte pointer. */
81 unsigned long interval,
82 pre_int,post_int; /* interval between this and
83 * the last run */
84 unsigned long proc_pdp_st; /* which pdp_st was the last
85 * to be processed */
86 unsigned long occu_pdp_st; /* when was the pdp_st
87 * before the last update
88 * time */
89 unsigned long proc_pdp_age; /* how old was the data in
90 * the pdp prep area when it
91 * was last updated */
92 unsigned long occu_pdp_age; /* how long ago was the last
93 * pdp_step time */
94 rrd_value_t *pdp_new; /* prepare the incoming data
95 * to be added the the
96 * existing entry */
97 rrd_value_t *pdp_temp; /* prepare the pdp values
98 * to be added the the
99 * cdp values */
101 long *tmpl_idx; /* index representing the settings
102 transported by the template index */
103 long tmpl_cnt = 2; /* time and data */
105 FILE *rrd_file;
106 rrd_t rrd;
107 time_t current_time = time(NULL);
108 char **updvals;
109 int schedule_smooth = 0;
110 char *template = NULL;
111 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
112 /* a vector of future Holt-Winters seasonal coefs */
113 unsigned long elapsed_pdp_st;
114 /* number of elapsed PDP steps since last update */
115 unsigned long *rra_step_cnt = NULL;
116 /* number of rows to be updated in an RRA for a data
117 * value. */
118 unsigned long start_pdp_offset;
119 /* number of PDP steps since the last update that
120 * are assigned to the first CDP to be generated
121 * since the last update. */
122 unsigned short scratch_idx;
123 /* index into the CDP scratch array */
124 enum cf_en current_cf;
125 /* numeric id of the current consolidation function */
127 while (1) {
128 static struct option long_options[] =
129 {
130 {"template", required_argument, 0, 't'},
131 {0,0,0,0}
132 };
133 int option_index = 0;
134 int opt;
135 opt = getopt_long(argc, argv, "t:",
136 long_options, &option_index);
138 if (opt == EOF)
139 break;
141 switch(opt) {
142 case 't':
143 template = optarg;
144 break;
146 case '?':
147 rrd_set_error("unknown option '%s'",argv[optind-1]);
148 rrd_free(&rrd);
149 return(-1);
150 }
151 }
153 /* need at least 2 arguments: filename, data. */
154 if (argc-optind < 2) {
155 rrd_set_error("Not enough arguments");
156 return -1;
157 }
159 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
160 return -1;
161 }
162 rra_current = rra_start = rra_begin = ftell(rrd_file);
163 /* This is defined in the ANSI C standard, section 7.9.5.3:
165 When a file is opened with udpate mode ('+' as the second
166 or third character in the ... list of mode argument
167 variables), both input and ouptut may be performed on the
168 associated stream. However, ... input may not be directly
169 followed by output without an intervening call to a file
170 positioning function, unless the input oepration encounters
171 end-of-file. */
172 fseek(rrd_file, 0, SEEK_CUR);
175 /* get exclusive lock to whole file.
176 * lock gets removed when we close the file.
177 */
178 if (LockRRD(rrd_file) != 0) {
179 rrd_set_error("could not lock RRD");
180 rrd_free(&rrd);
181 fclose(rrd_file);
182 return(-1);
183 }
185 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
186 rrd_set_error("allocating updvals pointer array");
187 rrd_free(&rrd);
188 fclose(rrd_file);
189 return(-1);
190 }
192 if ((pdp_temp = malloc(sizeof(rrd_value_t)
193 *rrd.stat_head->ds_cnt))==NULL){
194 rrd_set_error("allocating pdp_temp ...");
195 free(updvals);
196 rrd_free(&rrd);
197 fclose(rrd_file);
198 return(-1);
199 }
201 if ((tmpl_idx = malloc(sizeof(unsigned long)
202 *(rrd.stat_head->ds_cnt+1)))==NULL){
203 rrd_set_error("allocating tmpl_idx ...");
204 free(pdp_temp);
205 free(updvals);
206 rrd_free(&rrd);
207 fclose(rrd_file);
208 return(-1);
209 }
210 /* initialize template redirector */
211 /* default config
212 tmpl_idx[0] -> 0; (time)
213 tmpl_idx[1] -> 1; (DS 0)
214 tmpl_idx[2] -> 2; (DS 1)
215 tmpl_idx[3] -> 3; (DS 2)
216 ... */
217 for (i=0;i<=rrd.stat_head->ds_cnt;i++) tmpl_idx[i]=i;
218 tmpl_cnt=rrd.stat_head->ds_cnt+1;
219 if (template) {
220 char *dsname;
221 int tmpl_len;
222 dsname = template;
223 tmpl_cnt = 1; /* the first entry is the time */
224 tmpl_len = strlen(template);
225 for(i=0;i<=tmpl_len ;i++) {
226 if (template[i] == ':' || template[i] == '\0') {
227 template[i] = '\0';
228 if (tmpl_cnt>rrd.stat_head->ds_cnt){
229 rrd_set_error("Template contains more DS definitions than RRD");
230 free(updvals); free(pdp_temp);
231 free(tmpl_idx); rrd_free(&rrd);
232 fclose(rrd_file); return(-1);
233 }
234 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
235 rrd_set_error("unknown DS name '%s'",dsname);
236 free(updvals); free(pdp_temp);
237 free(tmpl_idx); rrd_free(&rrd);
238 fclose(rrd_file); return(-1);
239 } else {
240 /* the first element is always the time */
241 tmpl_idx[tmpl_cnt-1]++;
242 /* go to the next entry on the template */
243 dsname = &template[i+1];
244 /* fix the damage we did before */
245 if (i<tmpl_len) {
246 template[i]=':';
247 }
249 }
250 }
251 }
252 }
253 if ((pdp_new = malloc(sizeof(rrd_value_t)
254 *rrd.stat_head->ds_cnt))==NULL){
255 rrd_set_error("allocating pdp_new ...");
256 free(updvals);
257 free(pdp_temp);
258 free(tmpl_idx);
259 rrd_free(&rrd);
260 fclose(rrd_file);
261 return(-1);
262 }
264 /* loop through the arguments. */
265 for(arg_i=optind+1; arg_i<argc;arg_i++) {
266 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
267 char *step_start = stepper;
268 char *p;
269 char *parsetime_error = NULL;
270 enum {atstyle, normal} timesyntax;
271 struct time_value ds_tv;
272 if (stepper == NULL){
273 rrd_set_error("failed duplication argv entry");
274 free(updvals);
275 free(pdp_temp);
276 free(tmpl_idx);
277 rrd_free(&rrd);
278 fclose(rrd_file);
279 return(-1);
280 }
281 /* initialize all ds input to unknown except the first one
282 which has always got to be set */
283 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
284 strcpy(stepper,argv[arg_i]);
285 updvals[0]=stepper;
286 /* separate all ds elements; first must be examined separately
287 due to alternate time syntax */
288 if ((p=strchr(stepper,'@'))!=NULL) {
289 timesyntax = atstyle;
290 *p = '\0';
291 stepper = p+1;
292 } else if ((p=strchr(stepper,':'))!=NULL) {
293 timesyntax = normal;
294 *p = '\0';
295 stepper = p+1;
296 } else {
297 rrd_set_error("expected timestamp not found in data source from %s:...",
298 argv[arg_i]);
299 free(step_start);
300 break;
301 }
302 ii=1;
303 updvals[tmpl_idx[ii]] = stepper;
304 while (*stepper) {
305 if (*stepper == ':') {
306 *stepper = '\0';
307 ii++;
308 if (ii<tmpl_cnt){
309 updvals[tmpl_idx[ii]] = stepper+1;
310 }
311 }
312 stepper++;
313 }
315 if (ii != tmpl_cnt-1) {
316 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
317 tmpl_cnt-1, ii, argv[arg_i]);
318 free(step_start);
319 break;
320 }
322 /* get the time from the reading ... handle N */
323 if (timesyntax == atstyle) {
324 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
325 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
326 free(step_start);
327 break;
328 }
329 if (ds_tv.type == RELATIVE_TO_END_TIME ||
330 ds_tv.type == RELATIVE_TO_START_TIME) {
331 rrd_set_error("specifying time relative to the 'start' "
332 "or 'end' makes no sense here: %s",
333 updvals[0]);
334 free(step_start);
335 break;
336 }
338 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
339 } else if (strcmp(updvals[0],"N")==0){
340 current_time = time(NULL);
341 } else {
342 current_time = atol(updvals[0]);
343 }
345 if(current_time <= rrd.live_head->last_up){
346 rrd_set_error("illegal attempt to update using time %ld when "
347 "last update time is %ld (minimum one second step)",
348 current_time, rrd.live_head->last_up);
349 free(step_start);
350 break;
351 }
354 /* seek to the beginning of the rra's */
355 if (rra_current != rra_begin) {
356 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
357 rrd_set_error("seek error in rrd");
358 free(step_start);
359 break;
360 }
361 rra_current = rra_begin;
362 }
363 rra_start = rra_begin;
365 /* when was the current pdp started */
366 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
367 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
369 /* when did the last pdp_st occur */
370 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
371 occu_pdp_st = current_time - occu_pdp_age;
372 interval = current_time - rrd.live_head->last_up;
374 if (occu_pdp_st > proc_pdp_st){
375 /* OK we passed the pdp_st moment*/
376 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
377 * occurred before the latest
378 * pdp_st moment*/
379 post_int = occu_pdp_age; /* how much after it */
380 } else {
381 pre_int = interval;
382 post_int = 0;
383 }
385 #ifdef DEBUG
386 printf(
387 "proc_pdp_age %lu\t"
388 "proc_pdp_st %lu\t"
389 "occu_pfp_age %lu\t"
390 "occu_pdp_st %lu\t"
391 "int %lu\t"
392 "pre_int %lu\t"
393 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
394 occu_pdp_age, occu_pdp_st,
395 interval, pre_int, post_int);
396 #endif
398 /* process the data sources and update the pdp_prep
399 * area accordingly */
400 for(i=0;i<rrd.stat_head->ds_cnt;i++){
401 enum dst_en dst_idx;
402 dst_idx= dst_conv(rrd.ds_def[i].dst);
403 if((updvals[i+1][0] != 'U') &&
404 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
405 double rate = DNAN;
406 /* the data source type defines how to process the data */
407 /* pdp_temp contains rate * time ... eg the bytes
408 * transferred during the interval. Doing it this way saves
409 * a lot of math operations */
412 switch(dst_idx){
413 case DST_COUNTER:
414 case DST_DERIVE:
415 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
416 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
417 if(dst_idx == DST_COUNTER) {
418 /* simple overflow catcher sugestet by andres kroonmaa */
419 /* this will fail terribly for non 32 or 64 bit counters ... */
420 /* are there any others in SNMP land ? */
421 if (pdp_new[i] < (double)0.0 )
422 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
423 if (pdp_new[i] < (double)0.0 )
424 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
425 }
426 rate = pdp_new[i] / interval;
427 }
428 else {
429 pdp_new[i]= DNAN;
430 }
431 break;
432 case DST_ABSOLUTE:
433 pdp_new[i]= atof(updvals[i+1]);
434 rate = pdp_new[i] / interval;
435 break;
436 case DST_GAUGE:
437 pdp_new[i] = atof(updvals[i+1]) * interval;
438 rate = pdp_new[i] / interval;
439 break;
440 default:
441 rrd_set_error("rrd contains unknown DS type : '%s'",
442 rrd.ds_def[i].dst);
443 break;
444 }
445 /* break out of this for loop if the error string is set */
446 if (rrd_test_error()){
447 break;
448 }
449 /* make sure pdp_temp is neither too large or too small
450 * if any of these occur it becomes unknown ...
451 * sorry folks ... */
452 if ( ! isnan(rate) &&
453 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
454 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
455 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
456 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
457 pdp_new[i] = DNAN;
458 }
459 } else {
460 /* no news is news all the same */
461 pdp_new[i] = DNAN;
462 }
464 /* make a copy of the command line argument for the next run */
465 #ifdef DEBUG
466 fprintf(stderr,
467 "prep ds[%lu]\t"
468 "last_arg '%s'\t"
469 "this_arg '%s'\t"
470 "pdp_new %10.2f\n",
471 i,
472 rrd.pdp_prep[i].last_ds,
473 updvals[i+1], pdp_new[i]);
474 #endif
475 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
476 strncpy(rrd.pdp_prep[i].last_ds,
477 updvals[i+1],LAST_DS_LEN-1);
478 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
479 }
480 }
481 /* break out of the argument parsing loop if the error_string is set */
482 if (rrd_test_error()){
483 free(step_start);
484 break;
485 }
486 /* has a pdp_st moment occurred since the last run ? */
488 if (proc_pdp_st == occu_pdp_st){
489 /* no we have not passed a pdp_st moment. therefore update is simple */
491 for(i=0;i<rrd.stat_head->ds_cnt;i++){
492 if(isnan(pdp_new[i]))
493 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
494 else
495 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
496 #ifdef DEBUG
497 fprintf(stderr,
498 "NO PDP ds[%lu]\t"
499 "value %10.2f\t"
500 "unkn_sec %5lu\n",
501 i,
502 rrd.pdp_prep[i].scratch[PDP_val].u_val,
503 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
504 #endif
505 }
506 } else {
507 /* an pdp_st has occurred. */
509 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
510 * occurred up to the last run.
511 pdp_new[] contains rate*seconds from the latest run.
512 pdp_temp[] will contain the rate for cdp */
515 for(i=0;i<rrd.stat_head->ds_cnt;i++){
516 /* update pdp_prep to the current pdp_st */
517 if(isnan(pdp_new[i]))
518 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
519 else
520 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
521 pdp_new[i]/(double)interval*(double)pre_int;
523 /* if too much of the pdp_prep is unknown we dump it */
524 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
525 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
526 (occu_pdp_st-proc_pdp_st <=
527 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
528 pdp_temp[i] = DNAN;
529 } else {
530 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
531 / (double)( occu_pdp_st
532 - proc_pdp_st
533 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
534 }
535 /* make pdp_prep ready for the next run */
536 if(isnan(pdp_new[i])){
537 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
538 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
539 } else {
540 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
541 rrd.pdp_prep[i].scratch[PDP_val].u_val =
542 pdp_new[i]/(double)interval*(double)post_int;
543 }
545 #ifdef DEBUG
546 fprintf(stderr,
547 "PDP UPD ds[%lu]\t"
548 "pdp_temp %10.2f\t"
549 "new_prep %10.2f\t"
550 "new_unkn_sec %5lu\n",
551 i, pdp_temp[i],
552 rrd.pdp_prep[i].scratch[PDP_val].u_val,
553 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
554 #endif
555 }
557 /* compute the number of elapsed pdp_st moments */
558 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
559 #ifdef DEBUG
560 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
561 #endif
562 if (rra_step_cnt == NULL)
563 {
564 rra_step_cnt = (unsigned long *)
565 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
566 }
568 for(i = 0, rra_start = rra_begin;
569 i < rrd.stat_head->rra_cnt;
570 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
571 i++)
572 {
573 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
574 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
575 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
576 if (start_pdp_offset <= elapsed_pdp_st) {
577 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
578 rrd.rra_def[i].pdp_cnt + 1;
579 } else {
580 rra_step_cnt[i] = 0;
581 }
583 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
584 {
585 /* If this is a bulk update, we need to skip ahead in the seasonal
586 * arrays so that they will be correct for the next observed value;
587 * note that for the bulk update itself, no update will occur to
588 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
589 * be set to DNAN. */
590 if (rra_step_cnt[i] > 2)
591 {
592 /* skip update by resetting rra_step_cnt[i],
593 * note that this is not data source specific; this is due
594 * to the bulk update, not a DNAN value for the specific data
595 * source. */
596 rra_step_cnt[i] = 0;
597 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
598 &last_seasonal_coef);
599 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
600 &seasonal_coef);
601 }
603 /* periodically run a smoother for seasonal effects */
604 /* Need to use first cdp parameter buffer to track
605 * burnin (burnin requires a specific smoothing schedule).
606 * The CDP_init_seasonal parameter is really an RRA level,
607 * not a data source within RRA level parameter, but the rra_def
608 * is read only for rrd_update (not flushed to disk). */
609 iii = i*(rrd.stat_head -> ds_cnt);
610 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
611 <= BURNIN_CYCLES)
612 {
613 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
614 > rrd.rra_def[i].row_cnt - 1) {
615 /* mark off one of the burnin cycles */
616 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
617 schedule_smooth = 1;
618 }
619 } else {
620 /* someone has no doubt invented a trick to deal with this
621 * wrap around, but at least this code is clear. */
622 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
623 rrd.rra_ptr[i].cur_row)
624 {
625 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
626 * mapping between PDP and CDP */
627 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
628 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
629 {
630 #ifdef DEBUG
631 fprintf(stderr,
632 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
633 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
634 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
635 #endif
636 schedule_smooth = 1;
637 }
638 } else {
639 /* can't rely on negative numbers because we are working with
640 * unsigned values */
641 /* Don't need modulus here. If we've wrapped more than once, only
642 * one smooth is executed at the end. */
643 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
644 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
645 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
646 {
647 #ifdef DEBUG
648 fprintf(stderr,
649 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
650 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
651 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
652 #endif
653 schedule_smooth = 1;
654 }
655 }
656 }
658 rra_current = ftell(rrd_file);
659 } /* if cf is DEVSEASONAL or SEASONAL */
661 if (rrd_test_error()) break;
663 /* update CDP_PREP areas */
664 /* loop over data soures within each RRA */
665 for(ii = 0;
666 ii < rrd.stat_head->ds_cnt;
667 ii++)
668 {
670 /* iii indexes the CDP prep area for this data source within the RRA */
671 iii=i*rrd.stat_head->ds_cnt+ii;
673 if (rrd.rra_def[i].pdp_cnt > 1) {
675 if (rra_step_cnt[i] > 0) {
676 /* If we are in this block, as least 1 CDP value will be written to
677 * disk, this is the CDP_primary_val entry. If more than 1 value needs
678 * to be written, then the "fill in" value is the CDP_secondary_val
679 * entry. */
680 if (isnan(pdp_temp[ii]))
681 {
682 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
683 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
684 } else {
685 /* CDP_secondary value is the RRA "fill in" value for intermediary
686 * CDP data entries. No matter the CF, the value is the same because
687 * the average, max, min, and last of a list of identical values is
688 * the same, namely, the value itself. */
689 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
690 }
692 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
693 > rrd.rra_def[i].pdp_cnt*
694 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
695 {
696 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
697 /* initialize carry over */
698 if (current_cf == CF_AVERAGE) {
699 if (isnan(pdp_temp[ii])) {
700 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
701 } else {
702 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
703 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
704 }
705 } else {
706 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
707 }
708 } else {
709 rrd_value_t cum_val, cur_val;
710 switch (current_cf) {
711 case CF_AVERAGE:
712 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
713 cur_val = IFDNAN(pdp_temp[ii],0.0);
714 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
715 (cum_val + cur_val) /
716 (rrd.rra_def[i].pdp_cnt
717 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
718 /* initialize carry over value */
719 if (isnan(pdp_temp[ii])) {
720 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
721 } else {
722 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
723 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
724 }
725 break;
726 case CF_MAXIMUM:
727 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
728 cur_val = IFDNAN(pdp_temp[ii],-DINF);
729 #ifdef DEBUG
730 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
731 isnan(pdp_temp[ii])) {
732 fprintf(stderr,
733 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
734 i,ii);
735 exit(-1);
736 }
737 #endif
738 if (cur_val > cum_val)
739 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
740 else
741 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
742 /* initialize carry over value */
743 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
744 break;
745 case CF_MINIMUM:
746 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
747 cur_val = IFDNAN(pdp_temp[ii],DINF);
748 #ifdef DEBUG
749 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
750 isnan(pdp_temp[ii])) {
751 fprintf(stderr,
752 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
753 i,ii);
754 exit(-1);
755 }
756 #endif
757 if (cur_val < cum_val)
758 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
759 else
760 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
761 /* initialize carry over value */
762 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
763 break;
764 case CF_LAST:
765 default:
766 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
767 /* initialize carry over value */
768 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
769 break;
770 }
771 } /* endif meets xff value requirement for a valid value */
772 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
773 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
774 if (isnan(pdp_temp[ii]))
775 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
776 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
777 else
778 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
779 } else /* rra_step_cnt[i] == 0 */
780 {
781 #ifdef DEBUG
782 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
783 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
784 i,ii);
785 } else {
786 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
787 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
788 }
789 #endif
790 if (isnan(pdp_temp[ii])) {
791 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
792 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
793 {
794 if (current_cf == CF_AVERAGE) {
795 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
796 elapsed_pdp_st;
797 } else {
798 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
799 }
800 #ifdef DEBUG
801 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
802 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
803 #endif
804 } else {
805 switch (current_cf) {
806 case CF_AVERAGE:
807 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
808 elapsed_pdp_st;
809 break;
810 case CF_MINIMUM:
811 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
812 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
813 break;
814 case CF_MAXIMUM:
815 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
816 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
817 break;
818 case CF_LAST:
819 default:
820 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
821 break;
822 }
823 }
824 }
825 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
826 if (elapsed_pdp_st > 2)
827 {
828 switch (current_cf) {
829 case CF_AVERAGE:
830 default:
831 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
832 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
833 break;
834 case CF_SEASONAL:
835 case CF_DEVSEASONAL:
836 /* need to update cached seasonal values, so they are consistent
837 * with the bulk update */
838 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
839 * CDP_last_deviation are the same. */
840 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
841 last_seasonal_coef[ii];
842 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
843 seasonal_coef[ii];
844 break;
845 case CF_HWPREDICT:
846 /* need to update the null_count and last_null_count.
847 * even do this for non-DNAN pdp_temp because the
848 * algorithm is not learning from batch updates. */
849 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
850 elapsed_pdp_st;
851 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
852 elapsed_pdp_st - 1;
853 /* fall through */
854 case CF_DEVPREDICT:
855 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
856 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
857 break;
858 case CF_FAILURES:
859 /* do not count missed bulk values as failures */
860 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
861 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
862 /* need to reset violations buffer.
863 * could do this more carefully, but for now, just
864 * assume a bulk update wipes away all violations. */
865 erase_violations(&rrd, iii, i);
866 break;
867 }
868 }
869 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
871 if (rrd_test_error()) break;
873 } /* endif data sources loop */
874 } /* end RRA Loop */
876 /* this loop is only entered if elapsed_pdp_st < 3 */
877 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
878 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
879 {
880 for(i = 0, rra_start = rra_begin;
881 i < rrd.stat_head->rra_cnt;
882 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
883 i++)
884 {
885 if (rrd.rra_def[i].pdp_cnt > 1) continue;
887 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
888 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
889 {
890 lookup_seasonal(&rrd,i,rra_start,rrd_file,
891 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
892 &seasonal_coef);
893 }
894 if (rrd_test_error()) break;
895 /* loop over data soures within each RRA */
896 for(ii = 0;
897 ii < rrd.stat_head->ds_cnt;
898 ii++)
899 {
900 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
901 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
902 scratch_idx, seasonal_coef);
903 }
904 } /* end RRA Loop */
905 if (rrd_test_error()) break;
906 } /* end elapsed_pdp_st loop */
908 if (rrd_test_error()) break;
910 /* Ready to write to disk */
911 /* Move sequentially through the file, writing one RRA at a time.
912 * Note this architecture divorces the computation of CDP with
913 * flushing updated RRA entries to disk. */
914 for(i = 0, rra_start = rra_begin;
915 i < rrd.stat_head->rra_cnt;
916 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
917 i++) {
918 /* is there anything to write for this RRA? If not, continue. */
919 if (rra_step_cnt[i] == 0) continue;
921 /* write the first row */
922 #ifdef DEBUG
923 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
924 #endif
925 rrd.rra_ptr[i].cur_row++;
926 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
927 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
928 /* positition on the first row */
929 rra_pos_tmp = rra_start +
930 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
931 if(rra_pos_tmp != rra_current) {
932 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
933 rrd_set_error("seek error in rrd");
934 break;
935 }
936 rra_current = rra_pos_tmp;
937 }
938 #ifdef DEBUG
939 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
940 #endif
941 scratch_idx = CDP_primary_val;
942 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
943 if (rrd_test_error()) break;
945 /* write other rows of the bulk update, if any */
946 scratch_idx = CDP_secondary_val;
947 for ( ; rra_step_cnt[i] > 1;
948 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
949 {
950 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
951 {
952 #ifdef DEBUG
953 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
954 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
955 #endif
956 /* wrap */
957 rrd.rra_ptr[i].cur_row = 0;
958 /* seek back to beginning of current rra */
959 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
960 {
961 rrd_set_error("seek error in rrd");
962 break;
963 }
964 #ifdef DEBUG
965 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
966 #endif
967 rra_current = rra_start;
968 }
969 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
970 }
972 if (rrd_test_error())
973 break;
974 } /* RRA LOOP */
976 /* break out of the argument parsing loop if error_string is set */
977 if (rrd_test_error()){
978 free(step_start);
979 break;
980 }
982 } /* endif a pdp_st has occurred */
983 rrd.live_head->last_up = current_time;
984 free(step_start);
985 } /* function argument loop */
987 if (seasonal_coef != NULL) free(seasonal_coef);
988 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
989 if (rra_step_cnt != NULL) free(rra_step_cnt);
991 /* if we got here and if there is an error and if the file has not been
992 * written to, then close things up and return. */
993 if (rrd_test_error()) {
994 free(updvals);
995 free(tmpl_idx);
996 rrd_free(&rrd);
997 free(pdp_temp);
998 free(pdp_new);
999 fclose(rrd_file);
1000 return(-1);
1001 }
1003 /* aargh ... that was tough ... so many loops ... anyway, its done.
1004 * we just need to write back the live header portion now*/
1006 if (fseek(rrd_file, (sizeof(stat_head_t)
1007 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1008 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1009 SEEK_SET) != 0) {
1010 rrd_set_error("seek rrd for live header writeback");
1011 free(updvals);
1012 free(tmpl_idx);
1013 rrd_free(&rrd);
1014 free(pdp_temp);
1015 free(pdp_new);
1016 fclose(rrd_file);
1017 return(-1);
1018 }
1020 if(fwrite( rrd.live_head,
1021 sizeof(live_head_t), 1, rrd_file) != 1){
1022 rrd_set_error("fwrite live_head to rrd");
1023 free(updvals);
1024 rrd_free(&rrd);
1025 free(tmpl_idx);
1026 free(pdp_temp);
1027 free(pdp_new);
1028 fclose(rrd_file);
1029 return(-1);
1030 }
1032 if(fwrite( rrd.pdp_prep,
1033 sizeof(pdp_prep_t),
1034 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1035 rrd_set_error("ftwrite pdp_prep to rrd");
1036 free(updvals);
1037 rrd_free(&rrd);
1038 free(tmpl_idx);
1039 free(pdp_temp);
1040 free(pdp_new);
1041 fclose(rrd_file);
1042 return(-1);
1043 }
1045 if(fwrite( rrd.cdp_prep,
1046 sizeof(cdp_prep_t),
1047 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1048 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1050 rrd_set_error("ftwrite cdp_prep to rrd");
1051 free(updvals);
1052 free(tmpl_idx);
1053 rrd_free(&rrd);
1054 free(pdp_temp);
1055 free(pdp_new);
1056 fclose(rrd_file);
1057 return(-1);
1058 }
1060 if(fwrite( rrd.rra_ptr,
1061 sizeof(rra_ptr_t),
1062 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1063 rrd_set_error("fwrite rra_ptr to rrd");
1064 free(updvals);
1065 free(tmpl_idx);
1066 rrd_free(&rrd);
1067 free(pdp_temp);
1068 free(pdp_new);
1069 fclose(rrd_file);
1070 return(-1);
1071 }
1073 /* OK now close the files and free the memory */
1074 if(fclose(rrd_file) != 0){
1075 rrd_set_error("closing rrd");
1076 free(updvals);
1077 free(tmpl_idx);
1078 rrd_free(&rrd);
1079 free(pdp_temp);
1080 free(pdp_new);
1081 return(-1);
1082 }
1084 /* calling the smoothing code here guarantees at most
1085 * one smoothing operation per rrd_update call. Unfortunately,
1086 * it is possible with bulk updates, or a long-delayed update
1087 * for smoothing to occur off-schedule. This really isn't
1088 * critical except during the burning cycles. */
1089 if (schedule_smooth)
1090 {
1091 #ifndef WIN32
1092 rrd_file = fopen(argv[optind],"r+");
1093 #else
1094 rrd_file = fopen(argv[optind],"rb+");
1095 #endif
1096 rra_start = rra_begin;
1097 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1098 {
1099 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1100 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1101 {
1102 #ifdef DEBUG
1103 fprintf(stderr,"Running smoother for rra %ld\n",i);
1104 #endif
1105 apply_smoother(&rrd,i,rra_start,rrd_file);
1106 if (rrd_test_error())
1107 break;
1108 }
1109 rra_start += rrd.rra_def[i].row_cnt
1110 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1111 }
1112 fclose(rrd_file);
1113 }
1114 rrd_free(&rrd);
1115 free(updvals);
1116 free(tmpl_idx);
1117 free(pdp_new);
1118 free(pdp_temp);
1119 return(0);
1120 }
1122 /*
1123 * get exclusive lock to whole file.
1124 * lock gets removed when we close the file
1125 *
1126 * returns 0 on success
1127 */
1128 int
1129 LockRRD(FILE *rrdfile)
1130 {
1131 int rrd_fd; /* File descriptor for RRD */
1132 int stat;
1134 rrd_fd = fileno(rrdfile);
1136 {
1137 #ifndef WIN32
1138 struct flock lock;
1139 lock.l_type = F_WRLCK; /* exclusive write lock */
1140 lock.l_len = 0; /* whole file */
1141 lock.l_start = 0; /* start of file */
1142 lock.l_whence = SEEK_SET; /* end of file */
1144 stat = fcntl(rrd_fd, F_SETLK, &lock);
1145 #else
1146 struct _stat st;
1148 if ( _fstat( rrd_fd, &st ) == 0 ) {
1149 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1150 } else {
1151 stat = -1;
1152 }
1153 #endif
1154 }
1156 return(stat);
1157 }
1160 void
1161 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1162 unsigned short CDP_scratch_idx, FILE *rrd_file)
1163 {
1164 unsigned long ds_idx, cdp_idx;
1166 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1167 {
1168 /* compute the cdp index */
1169 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1170 #ifdef DEBUG
1171 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1172 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1173 rrd -> rra_def[rra_idx].cf_nam);
1174 #endif
1176 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1177 sizeof(rrd_value_t),1,rrd_file) != 1)
1178 {
1179 rrd_set_error("writing rrd");
1180 return;
1181 }
1182 *rra_current += sizeof(rrd_value_t);
1183 }
1184 }