1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.4 2001/03/10 23:54:41 oetiker
9 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
10 * parser and calculator from rrd_graph and puts then in a new file,
11 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
12 * clean-up of aberrant behavior stuff, including a bug fix.
13 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
14 * -- Jake Brutlag <jakeb@corp.webtv.net>
15 *
16 * Revision 1.3 2001/03/04 13:01:55 oetiker
17 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
18 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
19 * This is backwards compatible! But new files using the Aberrant stuff are not readable
20 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
21 * -- Jake Brutlag <jakeb@corp.webtv.net>
22 *
23 * Revision 1.2 2001/03/04 11:14:25 oetiker
24 * added at-style-time@value:value syntax to rrd_update
25 * -- Dave Bodenstab <imdave@mcs.net>
26 *
27 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
28 * checkin
29 *
30 *****************************************************************************/
32 #include "rrd_tool.h"
33 #include <sys/types.h>
34 #include <fcntl.h>
35 #include "rrd_hw.h"
36 #include "rrd_rpncalc.h"
38 #ifdef WIN32
39 #include <sys/locking.h>
40 #include <sys/stat.h>
41 #include <io.h>
42 #endif
44 /* Local prototypes */
45 int LockRRD(FILE *rrd_file);
46 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
47 unsigned short CDP_scratch_idx, FILE *rrd_file);
49 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
52 #ifdef STANDALONE
53 int
54 main(int argc, char **argv){
55 rrd_update(argc,argv);
56 if (rrd_test_error()) {
57 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
58 "Usage: rrdupdate filename\n"
59 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
60 "\t\t\ttime|N:value[:value...]\n\n"
61 "\t\t\tat-time@value[:value...]\n\n"
62 "\t\t\t[ time:value[:value...] ..]\n\n");
64 printf("ERROR: %s\n",rrd_get_error());
65 rrd_clear_error();
66 return 1;
67 }
68 return 0;
69 }
70 #endif
72 int
73 rrd_update(int argc, char **argv)
74 {
76 int arg_i = 2;
77 short j;
78 long i,ii,iii=1;
80 unsigned long rra_begin; /* byte pointer to the rra
81 * area in the rrd file. this
82 * pointer never changes value */
83 unsigned long rra_start; /* byte pointer to the rra
84 * area in the rrd file. this
85 * pointer changes as each rrd is
86 * processed. */
87 unsigned long rra_current; /* byte pointer to the current write
88 * spot in the rrd file. */
89 unsigned long rra_pos_tmp; /* temporary byte pointer. */
90 unsigned long interval,
91 pre_int,post_int; /* interval between this and
92 * the last run */
93 unsigned long proc_pdp_st; /* which pdp_st was the last
94 * to be processed */
95 unsigned long occu_pdp_st; /* when was the pdp_st
96 * before the last update
97 * time */
98 unsigned long proc_pdp_age; /* how old was the data in
99 * the pdp prep area when it
100 * was last updated */
101 unsigned long occu_pdp_age; /* how long ago was the last
102 * pdp_step time */
103 rrd_value_t *pdp_new; /* prepare the incoming data
104 * to be added the the
105 * existing entry */
106 rrd_value_t *pdp_temp; /* prepare the pdp values
107 * to be added the the
108 * cdp values */
110 long *tmpl_idx; /* index representing the settings
111 transported by the template index */
112 long tmpl_cnt = 2; /* time and data */
114 FILE *rrd_file;
115 rrd_t rrd;
116 time_t current_time = time(NULL);
117 char **updvals;
118 int schedule_smooth = 0;
119 char *template = NULL;
120 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
121 /* a vector of future Holt-Winters seasonal coefs */
122 unsigned long elapsed_pdp_st;
123 /* number of elapsed PDP steps since last update */
124 unsigned long *rra_step_cnt = NULL;
125 /* number of rows to be updated in an RRA for a data
126 * value. */
127 unsigned long start_pdp_offset;
128 /* number of PDP steps since the last update that
129 * are assigned to the first CDP to be generated
130 * since the last update. */
131 unsigned short scratch_idx;
132 /* index into the CDP scratch array */
133 enum cf_en current_cf;
134 /* numeric id of the current consolidation function */
135 rpnstack_t rpnstack; /* used for COMPUTE DS */
137 rpnstack_init(&rpnstack);
139 while (1) {
140 static struct option long_options[] =
141 {
142 {"template", required_argument, 0, 't'},
143 {0,0,0,0}
144 };
145 int option_index = 0;
146 int opt;
147 opt = getopt_long(argc, argv, "t:",
148 long_options, &option_index);
150 if (opt == EOF)
151 break;
153 switch(opt) {
154 case 't':
155 template = optarg;
156 break;
158 case '?':
159 rrd_set_error("unknown option '%s'",argv[optind-1]);
160 rrd_free(&rrd);
161 return(-1);
162 }
163 }
165 /* need at least 2 arguments: filename, data. */
166 if (argc-optind < 2) {
167 rrd_set_error("Not enough arguments");
168 return -1;
169 }
171 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
172 return -1;
173 }
174 rra_current = rra_start = rra_begin = ftell(rrd_file);
175 /* This is defined in the ANSI C standard, section 7.9.5.3:
177 When a file is opened with udpate mode ('+' as the second
178 or third character in the ... list of mode argument
179 variables), both input and ouptut may be performed on the
180 associated stream. However, ... input may not be directly
181 followed by output without an intervening call to a file
182 positioning function, unless the input oepration encounters
183 end-of-file. */
184 fseek(rrd_file, 0, SEEK_CUR);
187 /* get exclusive lock to whole file.
188 * lock gets removed when we close the file.
189 */
190 if (LockRRD(rrd_file) != 0) {
191 rrd_set_error("could not lock RRD");
192 rrd_free(&rrd);
193 fclose(rrd_file);
194 return(-1);
195 }
197 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
198 rrd_set_error("allocating updvals pointer array");
199 rrd_free(&rrd);
200 fclose(rrd_file);
201 return(-1);
202 }
204 if ((pdp_temp = malloc(sizeof(rrd_value_t)
205 *rrd.stat_head->ds_cnt))==NULL){
206 rrd_set_error("allocating pdp_temp ...");
207 free(updvals);
208 rrd_free(&rrd);
209 fclose(rrd_file);
210 return(-1);
211 }
213 if ((tmpl_idx = malloc(sizeof(unsigned long)
214 *(rrd.stat_head->ds_cnt+1)))==NULL){
215 rrd_set_error("allocating tmpl_idx ...");
216 free(pdp_temp);
217 free(updvals);
218 rrd_free(&rrd);
219 fclose(rrd_file);
220 return(-1);
221 }
222 /* initialize template redirector */
223 /* default config example (assume DS 1 is a CDEF DS)
224 tmpl_idx[0] -> 0; (time)
225 tmpl_idx[1] -> 1; (DS 0)
226 tmpl_idx[2] -> 3; (DS 2)
227 tmpl_idx[3] -> 4; (DS 3) */
228 tmpl_idx[0] = 0; /* time */
229 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
230 {
231 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
232 tmpl_idx[ii++]=i;
233 }
234 tmpl_cnt= ii;
236 if (template) {
237 char *dsname;
238 int tmpl_len;
239 dsname = template;
240 tmpl_cnt = 1; /* the first entry is the time */
241 tmpl_len = strlen(template);
242 for(i=0;i<=tmpl_len ;i++) {
243 if (template[i] == ':' || template[i] == '\0') {
244 template[i] = '\0';
245 if (tmpl_cnt>rrd.stat_head->ds_cnt){
246 rrd_set_error("Template contains more DS definitions than RRD");
247 free(updvals); free(pdp_temp);
248 free(tmpl_idx); rrd_free(&rrd);
249 fclose(rrd_file); return(-1);
250 }
251 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
252 rrd_set_error("unknown DS name '%s'",dsname);
253 free(updvals); free(pdp_temp);
254 free(tmpl_idx); rrd_free(&rrd);
255 fclose(rrd_file); return(-1);
256 } else {
257 /* the first element is always the time */
258 tmpl_idx[tmpl_cnt-1]++;
259 /* go to the next entry on the template */
260 dsname = &template[i+1];
261 /* fix the damage we did before */
262 if (i<tmpl_len) {
263 template[i]=':';
264 }
266 }
267 }
268 }
269 }
270 if ((pdp_new = malloc(sizeof(rrd_value_t)
271 *rrd.stat_head->ds_cnt))==NULL){
272 rrd_set_error("allocating pdp_new ...");
273 free(updvals);
274 free(pdp_temp);
275 free(tmpl_idx);
276 rrd_free(&rrd);
277 fclose(rrd_file);
278 return(-1);
279 }
281 /* loop through the arguments. */
282 for(arg_i=optind+1; arg_i<argc;arg_i++) {
283 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
284 char *step_start = stepper;
285 char *p;
286 char *parsetime_error = NULL;
287 enum {atstyle, normal} timesyntax;
288 struct time_value ds_tv;
289 if (stepper == NULL){
290 rrd_set_error("failed duplication argv entry");
291 free(updvals);
292 free(pdp_temp);
293 free(tmpl_idx);
294 rrd_free(&rrd);
295 fclose(rrd_file);
296 return(-1);
297 }
298 /* initialize all ds input to unknown except the first one
299 which has always got to be set */
300 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
301 strcpy(stepper,argv[arg_i]);
302 updvals[0]=stepper;
303 /* separate all ds elements; first must be examined separately
304 due to alternate time syntax */
305 if ((p=strchr(stepper,'@'))!=NULL) {
306 timesyntax = atstyle;
307 *p = '\0';
308 stepper = p+1;
309 } else if ((p=strchr(stepper,':'))!=NULL) {
310 timesyntax = normal;
311 *p = '\0';
312 stepper = p+1;
313 } else {
314 rrd_set_error("expected timestamp not found in data source from %s:...",
315 argv[arg_i]);
316 free(step_start);
317 break;
318 }
319 ii=1;
320 updvals[tmpl_idx[ii]] = stepper;
321 while (*stepper) {
322 if (*stepper == ':') {
323 *stepper = '\0';
324 ii++;
325 if (ii<tmpl_cnt){
326 updvals[tmpl_idx[ii]] = stepper+1;
327 }
328 }
329 stepper++;
330 }
332 if (ii != tmpl_cnt-1) {
333 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
334 tmpl_cnt-1, ii, argv[arg_i]);
335 free(step_start);
336 break;
337 }
339 /* get the time from the reading ... handle N */
340 if (timesyntax == atstyle) {
341 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
342 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
343 free(step_start);
344 break;
345 }
346 if (ds_tv.type == RELATIVE_TO_END_TIME ||
347 ds_tv.type == RELATIVE_TO_START_TIME) {
348 rrd_set_error("specifying time relative to the 'start' "
349 "or 'end' makes no sense here: %s",
350 updvals[0]);
351 free(step_start);
352 break;
353 }
355 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
356 } else if (strcmp(updvals[0],"N")==0){
357 current_time = time(NULL);
358 } else {
359 current_time = atol(updvals[0]);
360 }
362 if(current_time <= rrd.live_head->last_up){
363 rrd_set_error("illegal attempt to update using time %ld when "
364 "last update time is %ld (minimum one second step)",
365 current_time, rrd.live_head->last_up);
366 free(step_start);
367 break;
368 }
371 /* seek to the beginning of the rra's */
372 if (rra_current != rra_begin) {
373 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
374 rrd_set_error("seek error in rrd");
375 free(step_start);
376 break;
377 }
378 rra_current = rra_begin;
379 }
380 rra_start = rra_begin;
382 /* when was the current pdp started */
383 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
384 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
386 /* when did the last pdp_st occur */
387 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
388 occu_pdp_st = current_time - occu_pdp_age;
389 interval = current_time - rrd.live_head->last_up;
391 if (occu_pdp_st > proc_pdp_st){
392 /* OK we passed the pdp_st moment*/
393 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
394 * occurred before the latest
395 * pdp_st moment*/
396 post_int = occu_pdp_age; /* how much after it */
397 } else {
398 pre_int = interval;
399 post_int = 0;
400 }
402 #ifdef DEBUG
403 printf(
404 "proc_pdp_age %lu\t"
405 "proc_pdp_st %lu\t"
406 "occu_pfp_age %lu\t"
407 "occu_pdp_st %lu\t"
408 "int %lu\t"
409 "pre_int %lu\t"
410 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
411 occu_pdp_age, occu_pdp_st,
412 interval, pre_int, post_int);
413 #endif
415 /* process the data sources and update the pdp_prep
416 * area accordingly */
417 for(i=0;i<rrd.stat_head->ds_cnt;i++){
418 enum dst_en dst_idx;
419 dst_idx= dst_conv(rrd.ds_def[i].dst);
420 /* NOTE: DST_CDEF should never enter this if block, because
421 * updvals[i+1][0] is initialized to 'U'; unless the caller
422 * accidently specified a value for the DST_CDEF. To handle
423 * this case, an extra check is required. */
424 if((updvals[i+1][0] != 'U') &&
425 (dst_idx != DST_CDEF) &&
426 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
427 double rate = DNAN;
428 /* the data source type defines how to process the data */
429 /* pdp_temp contains rate * time ... eg the bytes
430 * transferred during the interval. Doing it this way saves
431 * a lot of math operations */
434 switch(dst_idx){
435 case DST_COUNTER:
436 case DST_DERIVE:
437 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
438 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
439 if(dst_idx == DST_COUNTER) {
440 /* simple overflow catcher sugestet by andres kroonmaa */
441 /* this will fail terribly for non 32 or 64 bit counters ... */
442 /* are there any others in SNMP land ? */
443 if (pdp_new[i] < (double)0.0 )
444 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
445 if (pdp_new[i] < (double)0.0 )
446 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
447 }
448 rate = pdp_new[i] / interval;
449 }
450 else {
451 pdp_new[i]= DNAN;
452 }
453 break;
454 case DST_ABSOLUTE:
455 pdp_new[i]= atof(updvals[i+1]);
456 rate = pdp_new[i] / interval;
457 break;
458 case DST_GAUGE:
459 pdp_new[i] = atof(updvals[i+1]) * interval;
460 rate = pdp_new[i] / interval;
461 break;
462 default:
463 rrd_set_error("rrd contains unknown DS type : '%s'",
464 rrd.ds_def[i].dst);
465 break;
466 }
467 /* break out of this for loop if the error string is set */
468 if (rrd_test_error()){
469 break;
470 }
471 /* make sure pdp_temp is neither too large or too small
472 * if any of these occur it becomes unknown ...
473 * sorry folks ... */
474 if ( ! isnan(rate) &&
475 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
476 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
477 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
478 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
479 pdp_new[i] = DNAN;
480 }
481 } else {
482 /* no news is news all the same */
483 pdp_new[i] = DNAN;
484 }
486 /* make a copy of the command line argument for the next run */
487 #ifdef DEBUG
488 fprintf(stderr,
489 "prep ds[%lu]\t"
490 "last_arg '%s'\t"
491 "this_arg '%s'\t"
492 "pdp_new %10.2f\n",
493 i,
494 rrd.pdp_prep[i].last_ds,
495 updvals[i+1], pdp_new[i]);
496 #endif
497 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
498 strncpy(rrd.pdp_prep[i].last_ds,
499 updvals[i+1],LAST_DS_LEN-1);
500 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
501 }
502 }
503 /* break out of the argument parsing loop if the error_string is set */
504 if (rrd_test_error()){
505 free(step_start);
506 break;
507 }
508 /* has a pdp_st moment occurred since the last run ? */
510 if (proc_pdp_st == occu_pdp_st){
511 /* no we have not passed a pdp_st moment. therefore update is simple */
513 for(i=0;i<rrd.stat_head->ds_cnt;i++){
514 if(isnan(pdp_new[i]))
515 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
516 else
517 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
518 #ifdef DEBUG
519 fprintf(stderr,
520 "NO PDP ds[%lu]\t"
521 "value %10.2f\t"
522 "unkn_sec %5lu\n",
523 i,
524 rrd.pdp_prep[i].scratch[PDP_val].u_val,
525 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
526 #endif
527 }
528 } else {
529 /* an pdp_st has occurred. */
531 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
532 * occurred up to the last run.
533 pdp_new[] contains rate*seconds from the latest run.
534 pdp_temp[] will contain the rate for cdp */
536 for(i=0;i<rrd.stat_head->ds_cnt;i++){
537 /* update pdp_prep to the current pdp_st */
538 if(isnan(pdp_new[i]))
539 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
540 else
541 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
542 pdp_new[i]/(double)interval*(double)pre_int;
544 /* if too much of the pdp_prep is unknown we dump it */
545 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
546 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
547 (occu_pdp_st-proc_pdp_st <=
548 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
549 pdp_temp[i] = DNAN;
550 } else {
551 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
552 / (double)( occu_pdp_st
553 - proc_pdp_st
554 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
555 }
557 /* process CDEF data sources; remember each CDEF DS can
558 * only reference other DS with a lower index number */
559 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
560 rpnp_t *rpnp;
561 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
562 /* substitue data values for OP_VARIABLE nodes */
563 for (ii = 0; rpnp[ii].op != OP_END; ii++)
564 {
565 if (rpnp[ii].op == OP_VARIABLE) {
566 rpnp[ii].op = OP_NUMBER;
567 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
568 }
569 }
570 /* run the rpn calculator */
571 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
572 free(rpnp);
573 break; /* exits the data sources pdp_temp loop */
574 }
575 }
577 /* make pdp_prep ready for the next run */
578 if(isnan(pdp_new[i])){
579 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
580 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
581 } else {
582 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
583 rrd.pdp_prep[i].scratch[PDP_val].u_val =
584 pdp_new[i]/(double)interval*(double)post_int;
585 }
587 #ifdef DEBUG
588 fprintf(stderr,
589 "PDP UPD ds[%lu]\t"
590 "pdp_temp %10.2f\t"
591 "new_prep %10.2f\t"
592 "new_unkn_sec %5lu\n",
593 i, pdp_temp[i],
594 rrd.pdp_prep[i].scratch[PDP_val].u_val,
595 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
596 #endif
597 }
599 /* if there were errors during the last loop, bail out here */
600 if (rrd_test_error()){
601 free(step_start);
602 break;
603 }
605 /* compute the number of elapsed pdp_st moments */
606 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
607 #ifdef DEBUG
608 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
609 #endif
610 if (rra_step_cnt == NULL)
611 {
612 rra_step_cnt = (unsigned long *)
613 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
614 }
616 for(i = 0, rra_start = rra_begin;
617 i < rrd.stat_head->rra_cnt;
618 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
619 i++)
620 {
621 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
622 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
623 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
624 if (start_pdp_offset <= elapsed_pdp_st) {
625 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
626 rrd.rra_def[i].pdp_cnt + 1;
627 } else {
628 rra_step_cnt[i] = 0;
629 }
631 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
632 {
633 /* If this is a bulk update, we need to skip ahead in the seasonal
634 * arrays so that they will be correct for the next observed value;
635 * note that for the bulk update itself, no update will occur to
636 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
637 * be set to DNAN. */
638 if (rra_step_cnt[i] > 2)
639 {
640 /* skip update by resetting rra_step_cnt[i],
641 * note that this is not data source specific; this is due
642 * to the bulk update, not a DNAN value for the specific data
643 * source. */
644 rra_step_cnt[i] = 0;
645 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
646 &last_seasonal_coef);
647 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
648 &seasonal_coef);
649 }
651 /* periodically run a smoother for seasonal effects */
652 /* Need to use first cdp parameter buffer to track
653 * burnin (burnin requires a specific smoothing schedule).
654 * The CDP_init_seasonal parameter is really an RRA level,
655 * not a data source within RRA level parameter, but the rra_def
656 * is read only for rrd_update (not flushed to disk). */
657 iii = i*(rrd.stat_head -> ds_cnt);
658 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
659 <= BURNIN_CYCLES)
660 {
661 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
662 > rrd.rra_def[i].row_cnt - 1) {
663 /* mark off one of the burnin cycles */
664 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
665 schedule_smooth = 1;
666 }
667 } else {
668 /* someone has no doubt invented a trick to deal with this
669 * wrap around, but at least this code is clear. */
670 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
671 rrd.rra_ptr[i].cur_row)
672 {
673 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
674 * mapping between PDP and CDP */
675 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
676 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
677 {
678 #ifdef DEBUG
679 fprintf(stderr,
680 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
681 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
682 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
683 #endif
684 schedule_smooth = 1;
685 }
686 } else {
687 /* can't rely on negative numbers because we are working with
688 * unsigned values */
689 /* Don't need modulus here. If we've wrapped more than once, only
690 * one smooth is executed at the end. */
691 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
692 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
693 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
694 {
695 #ifdef DEBUG
696 fprintf(stderr,
697 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
698 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
699 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
700 #endif
701 schedule_smooth = 1;
702 }
703 }
704 }
706 rra_current = ftell(rrd_file);
707 } /* if cf is DEVSEASONAL or SEASONAL */
709 if (rrd_test_error()) break;
711 /* update CDP_PREP areas */
712 /* loop over data soures within each RRA */
713 for(ii = 0;
714 ii < rrd.stat_head->ds_cnt;
715 ii++)
716 {
718 /* iii indexes the CDP prep area for this data source within the RRA */
719 iii=i*rrd.stat_head->ds_cnt+ii;
721 if (rrd.rra_def[i].pdp_cnt > 1) {
723 if (rra_step_cnt[i] > 0) {
724 /* If we are in this block, as least 1 CDP value will be written to
725 * disk, this is the CDP_primary_val entry. If more than 1 value needs
726 * to be written, then the "fill in" value is the CDP_secondary_val
727 * entry. */
728 if (isnan(pdp_temp[ii]))
729 {
730 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
731 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
732 } else {
733 /* CDP_secondary value is the RRA "fill in" value for intermediary
734 * CDP data entries. No matter the CF, the value is the same because
735 * the average, max, min, and last of a list of identical values is
736 * the same, namely, the value itself. */
737 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
738 }
740 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
741 > rrd.rra_def[i].pdp_cnt*
742 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
743 {
744 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
745 /* initialize carry over */
746 if (current_cf == CF_AVERAGE) {
747 if (isnan(pdp_temp[ii])) {
748 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
749 } else {
750 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
751 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
752 }
753 } else {
754 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
755 }
756 } else {
757 rrd_value_t cum_val, cur_val;
758 switch (current_cf) {
759 case CF_AVERAGE:
760 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
761 cur_val = IFDNAN(pdp_temp[ii],0.0);
762 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
763 (cum_val + cur_val) /
764 (rrd.rra_def[i].pdp_cnt
765 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
766 /* initialize carry over value */
767 if (isnan(pdp_temp[ii])) {
768 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
769 } else {
770 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
771 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
772 }
773 break;
774 case CF_MAXIMUM:
775 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
776 cur_val = IFDNAN(pdp_temp[ii],-DINF);
777 #ifdef DEBUG
778 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
779 isnan(pdp_temp[ii])) {
780 fprintf(stderr,
781 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
782 i,ii);
783 exit(-1);
784 }
785 #endif
786 if (cur_val > cum_val)
787 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
788 else
789 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
790 /* initialize carry over value */
791 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
792 break;
793 case CF_MINIMUM:
794 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
795 cur_val = IFDNAN(pdp_temp[ii],DINF);
796 #ifdef DEBUG
797 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
798 isnan(pdp_temp[ii])) {
799 fprintf(stderr,
800 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
801 i,ii);
802 exit(-1);
803 }
804 #endif
805 if (cur_val < cum_val)
806 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
807 else
808 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
809 /* initialize carry over value */
810 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
811 break;
812 case CF_LAST:
813 default:
814 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
815 /* initialize carry over value */
816 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
817 break;
818 }
819 } /* endif meets xff value requirement for a valid value */
820 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
821 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
822 if (isnan(pdp_temp[ii]))
823 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
824 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
825 else
826 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
827 } else /* rra_step_cnt[i] == 0 */
828 {
829 #ifdef DEBUG
830 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
831 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
832 i,ii);
833 } else {
834 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
835 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
836 }
837 #endif
838 if (isnan(pdp_temp[ii])) {
839 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
840 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
841 {
842 if (current_cf == CF_AVERAGE) {
843 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
844 elapsed_pdp_st;
845 } else {
846 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
847 }
848 #ifdef DEBUG
849 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
850 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
851 #endif
852 } else {
853 switch (current_cf) {
854 case CF_AVERAGE:
855 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
856 elapsed_pdp_st;
857 break;
858 case CF_MINIMUM:
859 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
860 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
861 break;
862 case CF_MAXIMUM:
863 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
864 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
865 break;
866 case CF_LAST:
867 default:
868 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
869 break;
870 }
871 }
872 }
873 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
874 if (elapsed_pdp_st > 2)
875 {
876 switch (current_cf) {
877 case CF_AVERAGE:
878 default:
879 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
880 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
881 break;
882 case CF_SEASONAL:
883 case CF_DEVSEASONAL:
884 /* need to update cached seasonal values, so they are consistent
885 * with the bulk update */
886 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
887 * CDP_last_deviation are the same. */
888 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
889 last_seasonal_coef[ii];
890 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
891 seasonal_coef[ii];
892 break;
893 case CF_HWPREDICT:
894 /* need to update the null_count and last_null_count.
895 * even do this for non-DNAN pdp_temp because the
896 * algorithm is not learning from batch updates. */
897 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
898 elapsed_pdp_st;
899 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
900 elapsed_pdp_st - 1;
901 /* fall through */
902 case CF_DEVPREDICT:
903 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
904 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
905 break;
906 case CF_FAILURES:
907 /* do not count missed bulk values as failures */
908 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
909 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
910 /* need to reset violations buffer.
911 * could do this more carefully, but for now, just
912 * assume a bulk update wipes away all violations. */
913 erase_violations(&rrd, iii, i);
914 break;
915 }
916 }
917 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
919 if (rrd_test_error()) break;
921 } /* endif data sources loop */
922 } /* end RRA Loop */
924 /* this loop is only entered if elapsed_pdp_st < 3 */
925 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
926 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
927 {
928 for(i = 0, rra_start = rra_begin;
929 i < rrd.stat_head->rra_cnt;
930 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
931 i++)
932 {
933 if (rrd.rra_def[i].pdp_cnt > 1) continue;
935 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
936 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
937 {
938 lookup_seasonal(&rrd,i,rra_start,rrd_file,
939 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
940 &seasonal_coef);
941 rra_current = ftell(rrd_file);
942 }
943 if (rrd_test_error()) break;
944 /* loop over data soures within each RRA */
945 for(ii = 0;
946 ii < rrd.stat_head->ds_cnt;
947 ii++)
948 {
949 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
950 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
951 scratch_idx, seasonal_coef);
952 }
953 } /* end RRA Loop */
954 if (rrd_test_error()) break;
955 } /* end elapsed_pdp_st loop */
957 if (rrd_test_error()) break;
959 /* Ready to write to disk */
960 /* Move sequentially through the file, writing one RRA at a time.
961 * Note this architecture divorces the computation of CDP with
962 * flushing updated RRA entries to disk. */
963 for(i = 0, rra_start = rra_begin;
964 i < rrd.stat_head->rra_cnt;
965 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
966 i++) {
967 /* is there anything to write for this RRA? If not, continue. */
968 if (rra_step_cnt[i] == 0) continue;
970 /* write the first row */
971 #ifdef DEBUG
972 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
973 #endif
974 rrd.rra_ptr[i].cur_row++;
975 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
976 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
977 /* positition on the first row */
978 rra_pos_tmp = rra_start +
979 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
980 if(rra_pos_tmp != rra_current) {
981 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
982 rrd_set_error("seek error in rrd");
983 break;
984 }
985 rra_current = rra_pos_tmp;
986 }
988 #ifdef DEBUG
989 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
990 #endif
991 scratch_idx = CDP_primary_val;
992 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
993 if (rrd_test_error()) break;
995 /* write other rows of the bulk update, if any */
996 scratch_idx = CDP_secondary_val;
997 for ( ; rra_step_cnt[i] > 1;
998 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
999 {
1000 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1001 {
1002 #ifdef DEBUG
1003 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1004 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1005 #endif
1006 /* wrap */
1007 rrd.rra_ptr[i].cur_row = 0;
1008 /* seek back to beginning of current rra */
1009 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1010 {
1011 rrd_set_error("seek error in rrd");
1012 break;
1013 }
1014 #ifdef DEBUG
1015 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1016 #endif
1017 rra_current = rra_start;
1018 }
1019 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1020 }
1022 if (rrd_test_error())
1023 break;
1024 } /* RRA LOOP */
1026 /* break out of the argument parsing loop if error_string is set */
1027 if (rrd_test_error()){
1028 free(step_start);
1029 break;
1030 }
1032 } /* endif a pdp_st has occurred */
1033 rrd.live_head->last_up = current_time;
1034 free(step_start);
1035 } /* function argument loop */
1037 if (seasonal_coef != NULL) free(seasonal_coef);
1038 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1039 if (rra_step_cnt != NULL) free(rra_step_cnt);
1040 rpnstack_free(&rpnstack);
1042 /* if we got here and if there is an error and if the file has not been
1043 * written to, then close things up and return. */
1044 if (rrd_test_error()) {
1045 free(updvals);
1046 free(tmpl_idx);
1047 rrd_free(&rrd);
1048 free(pdp_temp);
1049 free(pdp_new);
1050 fclose(rrd_file);
1051 return(-1);
1052 }
1054 /* aargh ... that was tough ... so many loops ... anyway, its done.
1055 * we just need to write back the live header portion now*/
1057 if (fseek(rrd_file, (sizeof(stat_head_t)
1058 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1059 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1060 SEEK_SET) != 0) {
1061 rrd_set_error("seek rrd for live header writeback");
1062 free(updvals);
1063 free(tmpl_idx);
1064 rrd_free(&rrd);
1065 free(pdp_temp);
1066 free(pdp_new);
1067 fclose(rrd_file);
1068 return(-1);
1069 }
1071 if(fwrite( rrd.live_head,
1072 sizeof(live_head_t), 1, rrd_file) != 1){
1073 rrd_set_error("fwrite live_head to rrd");
1074 free(updvals);
1075 rrd_free(&rrd);
1076 free(tmpl_idx);
1077 free(pdp_temp);
1078 free(pdp_new);
1079 fclose(rrd_file);
1080 return(-1);
1081 }
1083 if(fwrite( rrd.pdp_prep,
1084 sizeof(pdp_prep_t),
1085 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1086 rrd_set_error("ftwrite pdp_prep to rrd");
1087 free(updvals);
1088 rrd_free(&rrd);
1089 free(tmpl_idx);
1090 free(pdp_temp);
1091 free(pdp_new);
1092 fclose(rrd_file);
1093 return(-1);
1094 }
1096 if(fwrite( rrd.cdp_prep,
1097 sizeof(cdp_prep_t),
1098 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1099 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1101 rrd_set_error("ftwrite cdp_prep to rrd");
1102 free(updvals);
1103 free(tmpl_idx);
1104 rrd_free(&rrd);
1105 free(pdp_temp);
1106 free(pdp_new);
1107 fclose(rrd_file);
1108 return(-1);
1109 }
1111 if(fwrite( rrd.rra_ptr,
1112 sizeof(rra_ptr_t),
1113 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1114 rrd_set_error("fwrite rra_ptr to rrd");
1115 free(updvals);
1116 free(tmpl_idx);
1117 rrd_free(&rrd);
1118 free(pdp_temp);
1119 free(pdp_new);
1120 fclose(rrd_file);
1121 return(-1);
1122 }
1124 /* OK now close the files and free the memory */
1125 if(fclose(rrd_file) != 0){
1126 rrd_set_error("closing rrd");
1127 free(updvals);
1128 free(tmpl_idx);
1129 rrd_free(&rrd);
1130 free(pdp_temp);
1131 free(pdp_new);
1132 return(-1);
1133 }
1135 /* calling the smoothing code here guarantees at most
1136 * one smoothing operation per rrd_update call. Unfortunately,
1137 * it is possible with bulk updates, or a long-delayed update
1138 * for smoothing to occur off-schedule. This really isn't
1139 * critical except during the burning cycles. */
1140 if (schedule_smooth)
1141 {
1142 #ifndef WIN32
1143 rrd_file = fopen(argv[optind],"r+");
1144 #else
1145 rrd_file = fopen(argv[optind],"rb+");
1146 #endif
1147 rra_start = rra_begin;
1148 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1149 {
1150 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1151 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1152 {
1153 #ifdef DEBUG
1154 fprintf(stderr,"Running smoother for rra %ld\n",i);
1155 #endif
1156 apply_smoother(&rrd,i,rra_start,rrd_file);
1157 if (rrd_test_error())
1158 break;
1159 }
1160 rra_start += rrd.rra_def[i].row_cnt
1161 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1162 }
1163 fclose(rrd_file);
1164 }
1165 rrd_free(&rrd);
1166 free(updvals);
1167 free(tmpl_idx);
1168 free(pdp_new);
1169 free(pdp_temp);
1170 return(0);
1171 }
1173 /*
1174 * get exclusive lock to whole file.
1175 * lock gets removed when we close the file
1176 *
1177 * returns 0 on success
1178 */
1179 int
1180 LockRRD(FILE *rrdfile)
1181 {
1182 int rrd_fd; /* File descriptor for RRD */
1183 int stat;
1185 rrd_fd = fileno(rrdfile);
1187 {
1188 #ifndef WIN32
1189 struct flock lock;
1190 lock.l_type = F_WRLCK; /* exclusive write lock */
1191 lock.l_len = 0; /* whole file */
1192 lock.l_start = 0; /* start of file */
1193 lock.l_whence = SEEK_SET; /* end of file */
1195 stat = fcntl(rrd_fd, F_SETLK, &lock);
1196 #else
1197 struct _stat st;
1199 if ( _fstat( rrd_fd, &st ) == 0 ) {
1200 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1201 } else {
1202 stat = -1;
1203 }
1204 #endif
1205 }
1207 return(stat);
1208 }
1211 void
1212 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1213 unsigned short CDP_scratch_idx, FILE *rrd_file)
1214 {
1215 unsigned long ds_idx, cdp_idx;
1217 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1218 {
1219 /* compute the cdp index */
1220 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1221 #ifdef DEBUG
1222 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1223 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1224 rrd -> rra_def[rra_idx].cf_nam);
1225 #endif
1227 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1228 sizeof(rrd_value_t),1,rrd_file) != 1)
1229 {
1230 rrd_set_error("writing rrd");
1231 return;
1232 }
1233 *rra_current += sizeof(rrd_value_t);
1234 }
1235 }