1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.7 2003/02/13 07:05:27 oetiker
9 * Find attached the patch I promised to send to you. Please note that there
10 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
11 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
12 * library is identical to librrd, but it contains support code for per-thread
13 * global variables currently used for error information only. This is similar
14 * to how errno per-thread variables are implemented. librrd_th must be linked
15 * alongside of libpthred
16 *
17 * There is also a new file "THREADS", holding some documentation.
18 *
19 * -- Peter Stamfest <peter@stamfest.at>
20 *
21 * Revision 1.6 2002/02/01 20:34:49 oetiker
22 * fixed version number and date/time
23 *
24 * Revision 1.5 2001/05/09 05:31:01 oetiker
25 * Bug fix: when update of multiple PDP/CDP RRAs coincided
26 * with interpolation of multiple PDPs an incorrect value was
27 * stored as the CDP. Especially evident for GAUGE data sources.
28 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
29 *
30 * Revision 1.4 2001/03/10 23:54:41 oetiker
31 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
32 * parser and calculator from rrd_graph and puts then in a new file,
33 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
34 * clean-up of aberrant behavior stuff, including a bug fix.
35 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
36 * -- Jake Brutlag <jakeb@corp.webtv.net>
37 *
38 * Revision 1.3 2001/03/04 13:01:55 oetiker
39 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
40 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
41 * This is backwards compatible! But new files using the Aberrant stuff are not readable
42 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
43 * -- Jake Brutlag <jakeb@corp.webtv.net>
44 *
45 * Revision 1.2 2001/03/04 11:14:25 oetiker
46 * added at-style-time@value:value syntax to rrd_update
47 * -- Dave Bodenstab <imdave@mcs.net>
48 *
49 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
50 * checkin
51 *
52 *****************************************************************************/
54 #include "rrd_tool.h"
55 #include <sys/types.h>
56 #include <fcntl.h>
58 #ifdef WIN32
59 #include <sys/locking.h>
60 #include <sys/stat.h>
61 #include <io.h>
62 #endif
64 #include "rrd_hw.h"
65 #include "rrd_rpncalc.h"
67 #include "rrd_is_thread_safe.h"
69 /* Local prototypes */
70 int LockRRD(FILE *rrd_file);
71 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
72 unsigned long *rra_current,
73 unsigned short CDP_scratch_idx, FILE *rrd_file);
74 int rrd_update_r(char *filename, char *template, int argc, char **argv);
76 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
79 #ifdef STANDALONE
80 int
81 main(int argc, char **argv){
82 rrd_update(argc,argv);
83 if (rrd_test_error()) {
84 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
85 "Usage: rrdupdate filename\n"
86 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
87 "\t\t\ttime|N:value[:value...]\n\n"
88 "\t\t\tat-time@value[:value...]\n\n"
89 "\t\t\t[ time:value[:value...] ..]\n\n");
91 printf("ERROR: %s\n",rrd_get_error());
92 rrd_clear_error();
93 return 1;
94 }
95 return 0;
96 }
97 #endif
99 int
100 rrd_update(int argc, char **argv)
101 {
102 char *template = NULL;
103 int rc;
105 while (1) {
106 static struct option long_options[] =
107 {
108 {"template", required_argument, 0, 't'},
109 {0,0,0,0}
110 };
111 int option_index = 0;
112 int opt;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
116 if (opt == EOF)
117 break;
119 switch(opt) {
120 case 't':
121 template = optarg;
122 break;
124 case '?':
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
126 /* rrd_free(&rrd); */
127 return(-1);
128 }
129 }
131 /* need at least 2 arguments: filename, data. */
132 if (argc-optind < 2) {
133 rrd_set_error("Not enough arguments");
135 return -1;
136 }
138 rc = rrd_update_r(argv[optind], template,
139 argc - optind - 1, argv + optind + 1);
140 return rc;
141 }
143 int
144 rrd_update_r(char *filename, char *template, int argc, char **argv)
145 {
147 int arg_i = 2;
148 short j;
149 unsigned long i,ii,iii=1;
151 unsigned long rra_begin; /* byte pointer to the rra
152 * area in the rrd file. this
153 * pointer never changes value */
154 unsigned long rra_start; /* byte pointer to the rra
155 * area in the rrd file. this
156 * pointer changes as each rrd is
157 * processed. */
158 unsigned long rra_current; /* byte pointer to the current write
159 * spot in the rrd file. */
160 unsigned long rra_pos_tmp; /* temporary byte pointer. */
161 unsigned long interval,
162 pre_int,post_int; /* interval between this and
163 * the last run */
164 unsigned long proc_pdp_st; /* which pdp_st was the last
165 * to be processed */
166 unsigned long occu_pdp_st; /* when was the pdp_st
167 * before the last update
168 * time */
169 unsigned long proc_pdp_age; /* how old was the data in
170 * the pdp prep area when it
171 * was last updated */
172 unsigned long occu_pdp_age; /* how long ago was the last
173 * pdp_step time */
174 rrd_value_t *pdp_new; /* prepare the incoming data
175 * to be added the the
176 * existing entry */
177 rrd_value_t *pdp_temp; /* prepare the pdp values
178 * to be added the the
179 * cdp values */
181 long *tmpl_idx; /* index representing the settings
182 transported by the template index */
183 unsigned long tmpl_cnt = 2; /* time and data */
185 FILE *rrd_file;
186 rrd_t rrd;
187 time_t current_time = time(NULL);
188 char **updvals;
189 int schedule_smooth = 0;
190 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
191 /* a vector of future Holt-Winters seasonal coefs */
192 unsigned long elapsed_pdp_st;
193 /* number of elapsed PDP steps since last update */
194 unsigned long *rra_step_cnt = NULL;
195 /* number of rows to be updated in an RRA for a data
196 * value. */
197 unsigned long start_pdp_offset;
198 /* number of PDP steps since the last update that
199 * are assigned to the first CDP to be generated
200 * since the last update. */
201 unsigned short scratch_idx;
202 /* index into the CDP scratch array */
203 enum cf_en current_cf;
204 /* numeric id of the current consolidation function */
205 rpnstack_t rpnstack; /* used for COMPUTE DS */
207 rpnstack_init(&rpnstack);
209 /* need at least 1 arguments: data. */
210 if (argc < 1) {
211 rrd_set_error("Not enough arguments");
212 return -1;
213 }
215 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
216 return -1;
217 }
218 rra_current = rra_start = rra_begin = ftell(rrd_file);
219 /* This is defined in the ANSI C standard, section 7.9.5.3:
221 When a file is opened with udpate mode ('+' as the second
222 or third character in the ... list of mode argument
223 variables), both input and ouptut may be performed on the
224 associated stream. However, ... input may not be directly
225 followed by output without an intervening call to a file
226 positioning function, unless the input oepration encounters
227 end-of-file. */
228 fseek(rrd_file, 0, SEEK_CUR);
231 /* get exclusive lock to whole file.
232 * lock gets removed when we close the file.
233 */
234 if (LockRRD(rrd_file) != 0) {
235 rrd_set_error("could not lock RRD");
236 rrd_free(&rrd);
237 fclose(rrd_file);
238 return(-1);
239 }
241 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
242 rrd_set_error("allocating updvals pointer array");
243 rrd_free(&rrd);
244 fclose(rrd_file);
245 return(-1);
246 }
248 if ((pdp_temp = malloc(sizeof(rrd_value_t)
249 *rrd.stat_head->ds_cnt))==NULL){
250 rrd_set_error("allocating pdp_temp ...");
251 free(updvals);
252 rrd_free(&rrd);
253 fclose(rrd_file);
254 return(-1);
255 }
257 if ((tmpl_idx = malloc(sizeof(unsigned long)
258 *(rrd.stat_head->ds_cnt+1)))==NULL){
259 rrd_set_error("allocating tmpl_idx ...");
260 free(pdp_temp);
261 free(updvals);
262 rrd_free(&rrd);
263 fclose(rrd_file);
264 return(-1);
265 }
266 /* initialize template redirector */
267 /* default config example (assume DS 1 is a CDEF DS)
268 tmpl_idx[0] -> 0; (time)
269 tmpl_idx[1] -> 1; (DS 0)
270 tmpl_idx[2] -> 3; (DS 2)
271 tmpl_idx[3] -> 4; (DS 3) */
272 tmpl_idx[0] = 0; /* time */
273 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
274 {
275 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
276 tmpl_idx[ii++]=i;
277 }
278 tmpl_cnt= ii;
280 if (template) {
281 char *dsname;
282 unsigned int tmpl_len;
283 dsname = template;
284 tmpl_cnt = 1; /* the first entry is the time */
285 tmpl_len = strlen(template);
286 for(i=0;i<=tmpl_len ;i++) {
287 if (template[i] == ':' || template[i] == '\0') {
288 template[i] = '\0';
289 if (tmpl_cnt>rrd.stat_head->ds_cnt){
290 rrd_set_error("Template contains more DS definitions than RRD");
291 free(updvals); free(pdp_temp);
292 free(tmpl_idx); rrd_free(&rrd);
293 fclose(rrd_file); return(-1);
294 }
295 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
296 rrd_set_error("unknown DS name '%s'",dsname);
297 free(updvals); free(pdp_temp);
298 free(tmpl_idx); rrd_free(&rrd);
299 fclose(rrd_file); return(-1);
300 } else {
301 /* the first element is always the time */
302 tmpl_idx[tmpl_cnt-1]++;
303 /* go to the next entry on the template */
304 dsname = &template[i+1];
305 /* fix the damage we did before */
306 if (i<tmpl_len) {
307 template[i]=':';
308 }
310 }
311 }
312 }
313 }
314 if ((pdp_new = malloc(sizeof(rrd_value_t)
315 *rrd.stat_head->ds_cnt))==NULL){
316 rrd_set_error("allocating pdp_new ...");
317 free(updvals);
318 free(pdp_temp);
319 free(tmpl_idx);
320 rrd_free(&rrd);
321 fclose(rrd_file);
322 return(-1);
323 }
325 /* loop through the arguments. */
326 for(arg_i=0; arg_i<argc;arg_i++) {
327 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
328 char *step_start = stepper;
329 char *p;
330 char *parsetime_error = NULL;
331 enum {atstyle, normal} timesyntax;
332 struct time_value ds_tv;
333 if (stepper == NULL){
334 rrd_set_error("failed duplication argv entry");
335 free(updvals);
336 free(pdp_temp);
337 free(tmpl_idx);
338 rrd_free(&rrd);
339 fclose(rrd_file);
340 return(-1);
341 }
342 /* initialize all ds input to unknown except the first one
343 which has always got to be set */
344 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
345 strcpy(stepper,argv[arg_i]);
346 updvals[0]=stepper;
347 /* separate all ds elements; first must be examined separately
348 due to alternate time syntax */
349 if ((p=strchr(stepper,'@'))!=NULL) {
350 timesyntax = atstyle;
351 *p = '\0';
352 stepper = p+1;
353 } else if ((p=strchr(stepper,':'))!=NULL) {
354 timesyntax = normal;
355 *p = '\0';
356 stepper = p+1;
357 } else {
358 rrd_set_error("expected timestamp not found in data source from %s:...",
359 argv[arg_i]);
360 free(step_start);
361 break;
362 }
363 ii=1;
364 updvals[tmpl_idx[ii]] = stepper;
365 while (*stepper) {
366 if (*stepper == ':') {
367 *stepper = '\0';
368 ii++;
369 if (ii<tmpl_cnt){
370 updvals[tmpl_idx[ii]] = stepper+1;
371 }
372 }
373 stepper++;
374 }
376 if (ii != tmpl_cnt-1) {
377 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
378 tmpl_cnt-1, ii, argv[arg_i]);
379 free(step_start);
380 break;
381 }
383 /* get the time from the reading ... handle N */
384 if (timesyntax == atstyle) {
385 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
386 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
387 free(step_start);
388 break;
389 }
390 if (ds_tv.type == RELATIVE_TO_END_TIME ||
391 ds_tv.type == RELATIVE_TO_START_TIME) {
392 rrd_set_error("specifying time relative to the 'start' "
393 "or 'end' makes no sense here: %s",
394 updvals[0]);
395 free(step_start);
396 break;
397 }
399 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
400 } else if (strcmp(updvals[0],"N")==0){
401 current_time = time(NULL);
402 } else {
403 current_time = atol(updvals[0]);
404 }
406 if(current_time <= rrd.live_head->last_up){
407 rrd_set_error("illegal attempt to update using time %ld when "
408 "last update time is %ld (minimum one second step)",
409 current_time, rrd.live_head->last_up);
410 free(step_start);
411 break;
412 }
415 /* seek to the beginning of the rra's */
416 if (rra_current != rra_begin) {
417 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
418 rrd_set_error("seek error in rrd");
419 free(step_start);
420 break;
421 }
422 rra_current = rra_begin;
423 }
424 rra_start = rra_begin;
426 /* when was the current pdp started */
427 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
428 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
430 /* when did the last pdp_st occur */
431 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
432 occu_pdp_st = current_time - occu_pdp_age;
433 interval = current_time - rrd.live_head->last_up;
435 if (occu_pdp_st > proc_pdp_st){
436 /* OK we passed the pdp_st moment*/
437 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
438 * occurred before the latest
439 * pdp_st moment*/
440 post_int = occu_pdp_age; /* how much after it */
441 } else {
442 pre_int = interval;
443 post_int = 0;
444 }
446 #ifdef DEBUG
447 printf(
448 "proc_pdp_age %lu\t"
449 "proc_pdp_st %lu\t"
450 "occu_pfp_age %lu\t"
451 "occu_pdp_st %lu\t"
452 "int %lu\t"
453 "pre_int %lu\t"
454 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
455 occu_pdp_age, occu_pdp_st,
456 interval, pre_int, post_int);
457 #endif
459 /* process the data sources and update the pdp_prep
460 * area accordingly */
461 for(i=0;i<rrd.stat_head->ds_cnt;i++){
462 enum dst_en dst_idx;
463 dst_idx= dst_conv(rrd.ds_def[i].dst);
464 /* NOTE: DST_CDEF should never enter this if block, because
465 * updvals[i+1][0] is initialized to 'U'; unless the caller
466 * accidently specified a value for the DST_CDEF. To handle
467 * this case, an extra check is required. */
468 if((updvals[i+1][0] != 'U') &&
469 (dst_idx != DST_CDEF) &&
470 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
471 double rate = DNAN;
472 /* the data source type defines how to process the data */
473 /* pdp_new contains rate * time ... eg the bytes
474 * transferred during the interval. Doing it this way saves
475 * a lot of math operations */
478 switch(dst_idx){
479 case DST_COUNTER:
480 case DST_DERIVE:
481 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
482 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
483 if(dst_idx == DST_COUNTER) {
484 /* simple overflow catcher sugestet by andres kroonmaa */
485 /* this will fail terribly for non 32 or 64 bit counters ... */
486 /* are there any others in SNMP land ? */
487 if (pdp_new[i] < (double)0.0 )
488 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
489 if (pdp_new[i] < (double)0.0 )
490 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
491 }
492 rate = pdp_new[i] / interval;
493 }
494 else {
495 pdp_new[i]= DNAN;
496 }
497 break;
498 case DST_ABSOLUTE:
499 pdp_new[i]= atof(updvals[i+1]);
500 rate = pdp_new[i] / interval;
501 break;
502 case DST_GAUGE:
503 pdp_new[i] = atof(updvals[i+1]) * interval;
504 rate = pdp_new[i] / interval;
505 break;
506 default:
507 rrd_set_error("rrd contains unknown DS type : '%s'",
508 rrd.ds_def[i].dst);
509 break;
510 }
511 /* break out of this for loop if the error string is set */
512 if (rrd_test_error()){
513 break;
514 }
515 /* make sure pdp_temp is neither too large or too small
516 * if any of these occur it becomes unknown ...
517 * sorry folks ... */
518 if ( ! isnan(rate) &&
519 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
520 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
521 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
522 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
523 pdp_new[i] = DNAN;
524 }
525 } else {
526 /* no news is news all the same */
527 pdp_new[i] = DNAN;
528 }
530 /* make a copy of the command line argument for the next run */
531 #ifdef DEBUG
532 fprintf(stderr,
533 "prep ds[%lu]\t"
534 "last_arg '%s'\t"
535 "this_arg '%s'\t"
536 "pdp_new %10.2f\n",
537 i,
538 rrd.pdp_prep[i].last_ds,
539 updvals[i+1], pdp_new[i]);
540 #endif
541 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
542 strncpy(rrd.pdp_prep[i].last_ds,
543 updvals[i+1],LAST_DS_LEN-1);
544 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
545 }
546 }
547 /* break out of the argument parsing loop if the error_string is set */
548 if (rrd_test_error()){
549 free(step_start);
550 break;
551 }
552 /* has a pdp_st moment occurred since the last run ? */
554 if (proc_pdp_st == occu_pdp_st){
555 /* no we have not passed a pdp_st moment. therefore update is simple */
557 for(i=0;i<rrd.stat_head->ds_cnt;i++){
558 if(isnan(pdp_new[i]))
559 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
560 else
561 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
562 #ifdef DEBUG
563 fprintf(stderr,
564 "NO PDP ds[%lu]\t"
565 "value %10.2f\t"
566 "unkn_sec %5lu\n",
567 i,
568 rrd.pdp_prep[i].scratch[PDP_val].u_val,
569 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
570 #endif
571 }
572 } else {
573 /* an pdp_st has occurred. */
575 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
576 * occurred up to the last run.
577 pdp_new[] contains rate*seconds from the latest run.
578 pdp_temp[] will contain the rate for cdp */
580 for(i=0;i<rrd.stat_head->ds_cnt;i++){
581 /* update pdp_prep to the current pdp_st */
582 if(isnan(pdp_new[i]))
583 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
584 else
585 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
586 pdp_new[i]/(double)interval*(double)pre_int;
588 /* if too much of the pdp_prep is unknown we dump it */
589 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
590 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
591 (occu_pdp_st-proc_pdp_st <=
592 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
593 pdp_temp[i] = DNAN;
594 } else {
595 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
596 / (double)( occu_pdp_st
597 - proc_pdp_st
598 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
599 }
601 /* process CDEF data sources; remember each CDEF DS can
602 * only reference other DS with a lower index number */
603 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
604 rpnp_t *rpnp;
605 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
606 /* substitue data values for OP_VARIABLE nodes */
607 for (ii = 0; rpnp[ii].op != OP_END; ii++)
608 {
609 if (rpnp[ii].op == OP_VARIABLE) {
610 rpnp[ii].op = OP_NUMBER;
611 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
612 }
613 }
614 /* run the rpn calculator */
615 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
616 free(rpnp);
617 break; /* exits the data sources pdp_temp loop */
618 }
619 }
621 /* make pdp_prep ready for the next run */
622 if(isnan(pdp_new[i])){
623 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
624 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
625 } else {
626 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
627 rrd.pdp_prep[i].scratch[PDP_val].u_val =
628 pdp_new[i]/(double)interval*(double)post_int;
629 }
631 #ifdef DEBUG
632 fprintf(stderr,
633 "PDP UPD ds[%lu]\t"
634 "pdp_temp %10.2f\t"
635 "new_prep %10.2f\t"
636 "new_unkn_sec %5lu\n",
637 i, pdp_temp[i],
638 rrd.pdp_prep[i].scratch[PDP_val].u_val,
639 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
640 #endif
641 }
643 /* if there were errors during the last loop, bail out here */
644 if (rrd_test_error()){
645 free(step_start);
646 break;
647 }
649 /* compute the number of elapsed pdp_st moments */
650 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
651 #ifdef DEBUG
652 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
653 #endif
654 if (rra_step_cnt == NULL)
655 {
656 rra_step_cnt = (unsigned long *)
657 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
658 }
660 for(i = 0, rra_start = rra_begin;
661 i < rrd.stat_head->rra_cnt;
662 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
663 i++)
664 {
665 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
666 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
667 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
668 if (start_pdp_offset <= elapsed_pdp_st) {
669 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
670 rrd.rra_def[i].pdp_cnt + 1;
671 } else {
672 rra_step_cnt[i] = 0;
673 }
675 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
676 {
677 /* If this is a bulk update, we need to skip ahead in the seasonal
678 * arrays so that they will be correct for the next observed value;
679 * note that for the bulk update itself, no update will occur to
680 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
681 * be set to DNAN. */
682 if (rra_step_cnt[i] > 2)
683 {
684 /* skip update by resetting rra_step_cnt[i],
685 * note that this is not data source specific; this is due
686 * to the bulk update, not a DNAN value for the specific data
687 * source. */
688 rra_step_cnt[i] = 0;
689 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
690 &last_seasonal_coef);
691 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
692 &seasonal_coef);
693 }
695 /* periodically run a smoother for seasonal effects */
696 /* Need to use first cdp parameter buffer to track
697 * burnin (burnin requires a specific smoothing schedule).
698 * The CDP_init_seasonal parameter is really an RRA level,
699 * not a data source within RRA level parameter, but the rra_def
700 * is read only for rrd_update (not flushed to disk). */
701 iii = i*(rrd.stat_head -> ds_cnt);
702 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
703 <= BURNIN_CYCLES)
704 {
705 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
706 > rrd.rra_def[i].row_cnt - 1) {
707 /* mark off one of the burnin cycles */
708 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
709 schedule_smooth = 1;
710 }
711 } else {
712 /* someone has no doubt invented a trick to deal with this
713 * wrap around, but at least this code is clear. */
714 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
715 rrd.rra_ptr[i].cur_row)
716 {
717 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
718 * mapping between PDP and CDP */
719 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
720 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
721 {
722 #ifdef DEBUG
723 fprintf(stderr,
724 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
725 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
726 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
727 #endif
728 schedule_smooth = 1;
729 }
730 } else {
731 /* can't rely on negative numbers because we are working with
732 * unsigned values */
733 /* Don't need modulus here. If we've wrapped more than once, only
734 * one smooth is executed at the end. */
735 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
736 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
737 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
738 {
739 #ifdef DEBUG
740 fprintf(stderr,
741 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
742 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
743 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
744 #endif
745 schedule_smooth = 1;
746 }
747 }
748 }
750 rra_current = ftell(rrd_file);
751 } /* if cf is DEVSEASONAL or SEASONAL */
753 if (rrd_test_error()) break;
755 /* update CDP_PREP areas */
756 /* loop over data soures within each RRA */
757 for(ii = 0;
758 ii < rrd.stat_head->ds_cnt;
759 ii++)
760 {
762 /* iii indexes the CDP prep area for this data source within the RRA */
763 iii=i*rrd.stat_head->ds_cnt+ii;
765 if (rrd.rra_def[i].pdp_cnt > 1) {
767 if (rra_step_cnt[i] > 0) {
768 /* If we are in this block, as least 1 CDP value will be written to
769 * disk, this is the CDP_primary_val entry. If more than 1 value needs
770 * to be written, then the "fill in" value is the CDP_secondary_val
771 * entry. */
772 if (isnan(pdp_temp[ii]))
773 {
774 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
775 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
776 } else {
777 /* CDP_secondary value is the RRA "fill in" value for intermediary
778 * CDP data entries. No matter the CF, the value is the same because
779 * the average, max, min, and last of a list of identical values is
780 * the same, namely, the value itself. */
781 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
782 }
784 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
785 > rrd.rra_def[i].pdp_cnt*
786 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
787 {
788 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
789 /* initialize carry over */
790 if (current_cf == CF_AVERAGE) {
791 if (isnan(pdp_temp[ii])) {
792 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
793 } else {
794 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
795 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
796 }
797 } else {
798 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
799 }
800 } else {
801 rrd_value_t cum_val, cur_val;
802 switch (current_cf) {
803 case CF_AVERAGE:
804 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
805 cur_val = IFDNAN(pdp_temp[ii],0.0);
806 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
807 (cum_val + cur_val * start_pdp_offset) /
808 (rrd.rra_def[i].pdp_cnt
809 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
810 /* initialize carry over value */
811 if (isnan(pdp_temp[ii])) {
812 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
813 } else {
814 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
815 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
816 }
817 break;
818 case CF_MAXIMUM:
819 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
820 cur_val = IFDNAN(pdp_temp[ii],-DINF);
821 #ifdef DEBUG
822 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
823 isnan(pdp_temp[ii])) {
824 fprintf(stderr,
825 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
826 i,ii);
827 exit(-1);
828 }
829 #endif
830 if (cur_val > cum_val)
831 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
832 else
833 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
834 /* initialize carry over value */
835 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
836 break;
837 case CF_MINIMUM:
838 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
839 cur_val = IFDNAN(pdp_temp[ii],DINF);
840 #ifdef DEBUG
841 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
842 isnan(pdp_temp[ii])) {
843 fprintf(stderr,
844 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
845 i,ii);
846 exit(-1);
847 }
848 #endif
849 if (cur_val < cum_val)
850 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
851 else
852 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
853 /* initialize carry over value */
854 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
855 break;
856 case CF_LAST:
857 default:
858 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
859 /* initialize carry over value */
860 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
861 break;
862 }
863 } /* endif meets xff value requirement for a valid value */
864 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
865 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
866 if (isnan(pdp_temp[ii]))
867 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
868 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
869 else
870 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
871 } else /* rra_step_cnt[i] == 0 */
872 {
873 #ifdef DEBUG
874 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
875 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
876 i,ii);
877 } else {
878 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
879 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
880 }
881 #endif
882 if (isnan(pdp_temp[ii])) {
883 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
884 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
885 {
886 if (current_cf == CF_AVERAGE) {
887 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
888 elapsed_pdp_st;
889 } else {
890 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
891 }
892 #ifdef DEBUG
893 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
894 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
895 #endif
896 } else {
897 switch (current_cf) {
898 case CF_AVERAGE:
899 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
900 elapsed_pdp_st;
901 break;
902 case CF_MINIMUM:
903 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
904 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
905 break;
906 case CF_MAXIMUM:
907 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
908 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
909 break;
910 case CF_LAST:
911 default:
912 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
913 break;
914 }
915 }
916 }
917 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
918 if (elapsed_pdp_st > 2)
919 {
920 switch (current_cf) {
921 case CF_AVERAGE:
922 default:
923 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
924 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
925 break;
926 case CF_SEASONAL:
927 case CF_DEVSEASONAL:
928 /* need to update cached seasonal values, so they are consistent
929 * with the bulk update */
930 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
931 * CDP_last_deviation are the same. */
932 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
933 last_seasonal_coef[ii];
934 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
935 seasonal_coef[ii];
936 break;
937 case CF_HWPREDICT:
938 /* need to update the null_count and last_null_count.
939 * even do this for non-DNAN pdp_temp because the
940 * algorithm is not learning from batch updates. */
941 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
942 elapsed_pdp_st;
943 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
944 elapsed_pdp_st - 1;
945 /* fall through */
946 case CF_DEVPREDICT:
947 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
948 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
949 break;
950 case CF_FAILURES:
951 /* do not count missed bulk values as failures */
952 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
953 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
954 /* need to reset violations buffer.
955 * could do this more carefully, but for now, just
956 * assume a bulk update wipes away all violations. */
957 erase_violations(&rrd, iii, i);
958 break;
959 }
960 }
961 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
963 if (rrd_test_error()) break;
965 } /* endif data sources loop */
966 } /* end RRA Loop */
968 /* this loop is only entered if elapsed_pdp_st < 3 */
969 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
970 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
971 {
972 for(i = 0, rra_start = rra_begin;
973 i < rrd.stat_head->rra_cnt;
974 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
975 i++)
976 {
977 if (rrd.rra_def[i].pdp_cnt > 1) continue;
979 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
980 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
981 {
982 lookup_seasonal(&rrd,i,rra_start,rrd_file,
983 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
984 &seasonal_coef);
985 rra_current = ftell(rrd_file);
986 }
987 if (rrd_test_error()) break;
988 /* loop over data soures within each RRA */
989 for(ii = 0;
990 ii < rrd.stat_head->ds_cnt;
991 ii++)
992 {
993 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
994 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
995 scratch_idx, seasonal_coef);
996 }
997 } /* end RRA Loop */
998 if (rrd_test_error()) break;
999 } /* end elapsed_pdp_st loop */
1001 if (rrd_test_error()) break;
1003 /* Ready to write to disk */
1004 /* Move sequentially through the file, writing one RRA at a time.
1005 * Note this architecture divorces the computation of CDP with
1006 * flushing updated RRA entries to disk. */
1007 for(i = 0, rra_start = rra_begin;
1008 i < rrd.stat_head->rra_cnt;
1009 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1010 i++) {
1011 /* is there anything to write for this RRA? If not, continue. */
1012 if (rra_step_cnt[i] == 0) continue;
1014 /* write the first row */
1015 #ifdef DEBUG
1016 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1017 #endif
1018 rrd.rra_ptr[i].cur_row++;
1019 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1020 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1021 /* positition on the first row */
1022 rra_pos_tmp = rra_start +
1023 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1024 if(rra_pos_tmp != rra_current) {
1025 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1026 rrd_set_error("seek error in rrd");
1027 break;
1028 }
1029 rra_current = rra_pos_tmp;
1030 }
1032 #ifdef DEBUG
1033 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1034 #endif
1035 scratch_idx = CDP_primary_val;
1036 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1037 if (rrd_test_error()) break;
1039 /* write other rows of the bulk update, if any */
1040 scratch_idx = CDP_secondary_val;
1041 for ( ; rra_step_cnt[i] > 1;
1042 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1043 {
1044 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1045 {
1046 #ifdef DEBUG
1047 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1048 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1049 #endif
1050 /* wrap */
1051 rrd.rra_ptr[i].cur_row = 0;
1052 /* seek back to beginning of current rra */
1053 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1054 {
1055 rrd_set_error("seek error in rrd");
1056 break;
1057 }
1058 #ifdef DEBUG
1059 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1060 #endif
1061 rra_current = rra_start;
1062 }
1063 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1064 }
1066 if (rrd_test_error())
1067 break;
1068 } /* RRA LOOP */
1070 /* break out of the argument parsing loop if error_string is set */
1071 if (rrd_test_error()){
1072 free(step_start);
1073 break;
1074 }
1076 } /* endif a pdp_st has occurred */
1077 rrd.live_head->last_up = current_time;
1078 free(step_start);
1079 } /* function argument loop */
1081 if (seasonal_coef != NULL) free(seasonal_coef);
1082 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1083 if (rra_step_cnt != NULL) free(rra_step_cnt);
1084 rpnstack_free(&rpnstack);
1086 /* if we got here and if there is an error and if the file has not been
1087 * written to, then close things up and return. */
1088 if (rrd_test_error()) {
1089 free(updvals);
1090 free(tmpl_idx);
1091 rrd_free(&rrd);
1092 free(pdp_temp);
1093 free(pdp_new);
1094 fclose(rrd_file);
1095 return(-1);
1096 }
1098 /* aargh ... that was tough ... so many loops ... anyway, its done.
1099 * we just need to write back the live header portion now*/
1101 if (fseek(rrd_file, (sizeof(stat_head_t)
1102 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1103 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1104 SEEK_SET) != 0) {
1105 rrd_set_error("seek rrd for live header writeback");
1106 free(updvals);
1107 free(tmpl_idx);
1108 rrd_free(&rrd);
1109 free(pdp_temp);
1110 free(pdp_new);
1111 fclose(rrd_file);
1112 return(-1);
1113 }
1115 if(fwrite( rrd.live_head,
1116 sizeof(live_head_t), 1, rrd_file) != 1){
1117 rrd_set_error("fwrite live_head to rrd");
1118 free(updvals);
1119 rrd_free(&rrd);
1120 free(tmpl_idx);
1121 free(pdp_temp);
1122 free(pdp_new);
1123 fclose(rrd_file);
1124 return(-1);
1125 }
1127 if(fwrite( rrd.pdp_prep,
1128 sizeof(pdp_prep_t),
1129 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1130 rrd_set_error("ftwrite pdp_prep to rrd");
1131 free(updvals);
1132 rrd_free(&rrd);
1133 free(tmpl_idx);
1134 free(pdp_temp);
1135 free(pdp_new);
1136 fclose(rrd_file);
1137 return(-1);
1138 }
1140 if(fwrite( rrd.cdp_prep,
1141 sizeof(cdp_prep_t),
1142 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1143 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1145 rrd_set_error("ftwrite cdp_prep to rrd");
1146 free(updvals);
1147 free(tmpl_idx);
1148 rrd_free(&rrd);
1149 free(pdp_temp);
1150 free(pdp_new);
1151 fclose(rrd_file);
1152 return(-1);
1153 }
1155 if(fwrite( rrd.rra_ptr,
1156 sizeof(rra_ptr_t),
1157 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1158 rrd_set_error("fwrite rra_ptr to rrd");
1159 free(updvals);
1160 free(tmpl_idx);
1161 rrd_free(&rrd);
1162 free(pdp_temp);
1163 free(pdp_new);
1164 fclose(rrd_file);
1165 return(-1);
1166 }
1168 /* OK now close the files and free the memory */
1169 if(fclose(rrd_file) != 0){
1170 rrd_set_error("closing rrd");
1171 free(updvals);
1172 free(tmpl_idx);
1173 rrd_free(&rrd);
1174 free(pdp_temp);
1175 free(pdp_new);
1176 return(-1);
1177 }
1179 /* calling the smoothing code here guarantees at most
1180 * one smoothing operation per rrd_update call. Unfortunately,
1181 * it is possible with bulk updates, or a long-delayed update
1182 * for smoothing to occur off-schedule. This really isn't
1183 * critical except during the burning cycles. */
1184 if (schedule_smooth)
1185 {
1186 #ifndef WIN32
1187 rrd_file = fopen(filename,"r+");
1188 #else
1189 rrd_file = fopen(filename,"rb+");
1190 #endif
1191 rra_start = rra_begin;
1192 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1193 {
1194 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1195 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1196 {
1197 #ifdef DEBUG
1198 fprintf(stderr,"Running smoother for rra %ld\n",i);
1199 #endif
1200 apply_smoother(&rrd,i,rra_start,rrd_file);
1201 if (rrd_test_error())
1202 break;
1203 }
1204 rra_start += rrd.rra_def[i].row_cnt
1205 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1206 }
1207 fclose(rrd_file);
1208 }
1209 rrd_free(&rrd);
1210 free(updvals);
1211 free(tmpl_idx);
1212 free(pdp_new);
1213 free(pdp_temp);
1214 return(0);
1215 }
1217 /*
1218 * get exclusive lock to whole file.
1219 * lock gets removed when we close the file
1220 *
1221 * returns 0 on success
1222 */
1223 int
1224 LockRRD(FILE *rrdfile)
1225 {
1226 int rrd_fd; /* File descriptor for RRD */
1227 int stat;
1229 rrd_fd = fileno(rrdfile);
1231 {
1232 #ifndef WIN32
1233 struct flock lock;
1234 lock.l_type = F_WRLCK; /* exclusive write lock */
1235 lock.l_len = 0; /* whole file */
1236 lock.l_start = 0; /* start of file */
1237 lock.l_whence = SEEK_SET; /* end of file */
1239 stat = fcntl(rrd_fd, F_SETLK, &lock);
1240 #else
1241 struct _stat st;
1243 if ( _fstat( rrd_fd, &st ) == 0 ) {
1244 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1245 } else {
1246 stat = -1;
1247 }
1248 #endif
1249 }
1251 return(stat);
1252 }
1255 void
1256 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1257 unsigned short CDP_scratch_idx, FILE *rrd_file)
1258 {
1259 unsigned long ds_idx, cdp_idx;
1261 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1262 {
1263 /* compute the cdp index */
1264 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1265 #ifdef DEBUG
1266 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1267 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1268 rrd -> rra_def[rra_idx].cf_nam);
1269 #endif
1271 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1272 sizeof(rrd_value_t),1,rrd_file) != 1)
1273 {
1274 rrd_set_error("writing rrd");
1275 return;
1276 }
1277 *rra_current += sizeof(rrd_value_t);
1278 }
1279 }