1 /*****************************************************************************
2 * RRDtool 1.0.33 Copyright Tobias Oetiker, 1997 - 2000
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.5 2001/05/09 05:31:01 oetiker
9 * Bug fix: when update of multiple PDP/CDP RRAs coincided
10 * with interpolation of multiple PDPs an incorrect value was
11 * stored as the CDP. Especially evident for GAUGE data sources.
12 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
13 *
14 * Revision 1.4 2001/03/10 23:54:41 oetiker
15 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
16 * parser and calculator from rrd_graph and puts then in a new file,
17 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
18 * clean-up of aberrant behavior stuff, including a bug fix.
19 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
20 * -- Jake Brutlag <jakeb@corp.webtv.net>
21 *
22 * Revision 1.3 2001/03/04 13:01:55 oetiker
23 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
24 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
25 * This is backwards compatible! But new files using the Aberrant stuff are not readable
26 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
27 * -- Jake Brutlag <jakeb@corp.webtv.net>
28 *
29 * Revision 1.2 2001/03/04 11:14:25 oetiker
30 * added at-style-time@value:value syntax to rrd_update
31 * -- Dave Bodenstab <imdave@mcs.net>
32 *
33 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
34 * checkin
35 *
36 *****************************************************************************/
38 #include "rrd_tool.h"
39 #include <sys/types.h>
40 #include <fcntl.h>
41 #include "rrd_hw.h"
42 #include "rrd_rpncalc.h"
44 #ifdef WIN32
45 #include <sys/locking.h>
46 #include <sys/stat.h>
47 #include <io.h>
48 #endif
50 /* Local prototypes */
51 int LockRRD(FILE *rrd_file);
52 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
53 unsigned short CDP_scratch_idx, FILE *rrd_file);
55 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
58 #ifdef STANDALONE
59 int
60 main(int argc, char **argv){
61 rrd_update(argc,argv);
62 if (rrd_test_error()) {
63 printf("RRDtool 1.0.33 Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
64 "Usage: rrdupdate filename\n"
65 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
66 "\t\t\ttime|N:value[:value...]\n\n"
67 "\t\t\tat-time@value[:value...]\n\n"
68 "\t\t\t[ time:value[:value...] ..]\n\n");
70 printf("ERROR: %s\n",rrd_get_error());
71 rrd_clear_error();
72 return 1;
73 }
74 return 0;
75 }
76 #endif
78 int
79 rrd_update(int argc, char **argv)
80 {
82 int arg_i = 2;
83 short j;
84 long i,ii,iii=1;
86 unsigned long rra_begin; /* byte pointer to the rra
87 * area in the rrd file. this
88 * pointer never changes value */
89 unsigned long rra_start; /* byte pointer to the rra
90 * area in the rrd file. this
91 * pointer changes as each rrd is
92 * processed. */
93 unsigned long rra_current; /* byte pointer to the current write
94 * spot in the rrd file. */
95 unsigned long rra_pos_tmp; /* temporary byte pointer. */
96 unsigned long interval,
97 pre_int,post_int; /* interval between this and
98 * the last run */
99 unsigned long proc_pdp_st; /* which pdp_st was the last
100 * to be processed */
101 unsigned long occu_pdp_st; /* when was the pdp_st
102 * before the last update
103 * time */
104 unsigned long proc_pdp_age; /* how old was the data in
105 * the pdp prep area when it
106 * was last updated */
107 unsigned long occu_pdp_age; /* how long ago was the last
108 * pdp_step time */
109 rrd_value_t *pdp_new; /* prepare the incoming data
110 * to be added the the
111 * existing entry */
112 rrd_value_t *pdp_temp; /* prepare the pdp values
113 * to be added the the
114 * cdp values */
116 long *tmpl_idx; /* index representing the settings
117 transported by the template index */
118 long tmpl_cnt = 2; /* time and data */
120 FILE *rrd_file;
121 rrd_t rrd;
122 time_t current_time = time(NULL);
123 char **updvals;
124 int schedule_smooth = 0;
125 char *template = NULL;
126 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
127 /* a vector of future Holt-Winters seasonal coefs */
128 unsigned long elapsed_pdp_st;
129 /* number of elapsed PDP steps since last update */
130 unsigned long *rra_step_cnt = NULL;
131 /* number of rows to be updated in an RRA for a data
132 * value. */
133 unsigned long start_pdp_offset;
134 /* number of PDP steps since the last update that
135 * are assigned to the first CDP to be generated
136 * since the last update. */
137 unsigned short scratch_idx;
138 /* index into the CDP scratch array */
139 enum cf_en current_cf;
140 /* numeric id of the current consolidation function */
141 rpnstack_t rpnstack; /* used for COMPUTE DS */
143 rpnstack_init(&rpnstack);
145 while (1) {
146 static struct option long_options[] =
147 {
148 {"template", required_argument, 0, 't'},
149 {0,0,0,0}
150 };
151 int option_index = 0;
152 int opt;
153 opt = getopt_long(argc, argv, "t:",
154 long_options, &option_index);
156 if (opt == EOF)
157 break;
159 switch(opt) {
160 case 't':
161 template = optarg;
162 break;
164 case '?':
165 rrd_set_error("unknown option '%s'",argv[optind-1]);
166 rrd_free(&rrd);
167 return(-1);
168 }
169 }
171 /* need at least 2 arguments: filename, data. */
172 if (argc-optind < 2) {
173 rrd_set_error("Not enough arguments");
174 return -1;
175 }
177 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
178 return -1;
179 }
180 rra_current = rra_start = rra_begin = ftell(rrd_file);
181 /* This is defined in the ANSI C standard, section 7.9.5.3:
183 When a file is opened with udpate mode ('+' as the second
184 or third character in the ... list of mode argument
185 variables), both input and ouptut may be performed on the
186 associated stream. However, ... input may not be directly
187 followed by output without an intervening call to a file
188 positioning function, unless the input oepration encounters
189 end-of-file. */
190 fseek(rrd_file, 0, SEEK_CUR);
193 /* get exclusive lock to whole file.
194 * lock gets removed when we close the file.
195 */
196 if (LockRRD(rrd_file) != 0) {
197 rrd_set_error("could not lock RRD");
198 rrd_free(&rrd);
199 fclose(rrd_file);
200 return(-1);
201 }
203 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
204 rrd_set_error("allocating updvals pointer array");
205 rrd_free(&rrd);
206 fclose(rrd_file);
207 return(-1);
208 }
210 if ((pdp_temp = malloc(sizeof(rrd_value_t)
211 *rrd.stat_head->ds_cnt))==NULL){
212 rrd_set_error("allocating pdp_temp ...");
213 free(updvals);
214 rrd_free(&rrd);
215 fclose(rrd_file);
216 return(-1);
217 }
219 if ((tmpl_idx = malloc(sizeof(unsigned long)
220 *(rrd.stat_head->ds_cnt+1)))==NULL){
221 rrd_set_error("allocating tmpl_idx ...");
222 free(pdp_temp);
223 free(updvals);
224 rrd_free(&rrd);
225 fclose(rrd_file);
226 return(-1);
227 }
228 /* initialize template redirector */
229 /* default config example (assume DS 1 is a CDEF DS)
230 tmpl_idx[0] -> 0; (time)
231 tmpl_idx[1] -> 1; (DS 0)
232 tmpl_idx[2] -> 3; (DS 2)
233 tmpl_idx[3] -> 4; (DS 3) */
234 tmpl_idx[0] = 0; /* time */
235 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
236 {
237 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
238 tmpl_idx[ii++]=i;
239 }
240 tmpl_cnt= ii;
242 if (template) {
243 char *dsname;
244 int tmpl_len;
245 dsname = template;
246 tmpl_cnt = 1; /* the first entry is the time */
247 tmpl_len = strlen(template);
248 for(i=0;i<=tmpl_len ;i++) {
249 if (template[i] == ':' || template[i] == '\0') {
250 template[i] = '\0';
251 if (tmpl_cnt>rrd.stat_head->ds_cnt){
252 rrd_set_error("Template contains more DS definitions than RRD");
253 free(updvals); free(pdp_temp);
254 free(tmpl_idx); rrd_free(&rrd);
255 fclose(rrd_file); return(-1);
256 }
257 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
258 rrd_set_error("unknown DS name '%s'",dsname);
259 free(updvals); free(pdp_temp);
260 free(tmpl_idx); rrd_free(&rrd);
261 fclose(rrd_file); return(-1);
262 } else {
263 /* the first element is always the time */
264 tmpl_idx[tmpl_cnt-1]++;
265 /* go to the next entry on the template */
266 dsname = &template[i+1];
267 /* fix the damage we did before */
268 if (i<tmpl_len) {
269 template[i]=':';
270 }
272 }
273 }
274 }
275 }
276 if ((pdp_new = malloc(sizeof(rrd_value_t)
277 *rrd.stat_head->ds_cnt))==NULL){
278 rrd_set_error("allocating pdp_new ...");
279 free(updvals);
280 free(pdp_temp);
281 free(tmpl_idx);
282 rrd_free(&rrd);
283 fclose(rrd_file);
284 return(-1);
285 }
287 /* loop through the arguments. */
288 for(arg_i=optind+1; arg_i<argc;arg_i++) {
289 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
290 char *step_start = stepper;
291 char *p;
292 char *parsetime_error = NULL;
293 enum {atstyle, normal} timesyntax;
294 struct time_value ds_tv;
295 if (stepper == NULL){
296 rrd_set_error("failed duplication argv entry");
297 free(updvals);
298 free(pdp_temp);
299 free(tmpl_idx);
300 rrd_free(&rrd);
301 fclose(rrd_file);
302 return(-1);
303 }
304 /* initialize all ds input to unknown except the first one
305 which has always got to be set */
306 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
307 strcpy(stepper,argv[arg_i]);
308 updvals[0]=stepper;
309 /* separate all ds elements; first must be examined separately
310 due to alternate time syntax */
311 if ((p=strchr(stepper,'@'))!=NULL) {
312 timesyntax = atstyle;
313 *p = '\0';
314 stepper = p+1;
315 } else if ((p=strchr(stepper,':'))!=NULL) {
316 timesyntax = normal;
317 *p = '\0';
318 stepper = p+1;
319 } else {
320 rrd_set_error("expected timestamp not found in data source from %s:...",
321 argv[arg_i]);
322 free(step_start);
323 break;
324 }
325 ii=1;
326 updvals[tmpl_idx[ii]] = stepper;
327 while (*stepper) {
328 if (*stepper == ':') {
329 *stepper = '\0';
330 ii++;
331 if (ii<tmpl_cnt){
332 updvals[tmpl_idx[ii]] = stepper+1;
333 }
334 }
335 stepper++;
336 }
338 if (ii != tmpl_cnt-1) {
339 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
340 tmpl_cnt-1, ii, argv[arg_i]);
341 free(step_start);
342 break;
343 }
345 /* get the time from the reading ... handle N */
346 if (timesyntax == atstyle) {
347 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
348 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
349 free(step_start);
350 break;
351 }
352 if (ds_tv.type == RELATIVE_TO_END_TIME ||
353 ds_tv.type == RELATIVE_TO_START_TIME) {
354 rrd_set_error("specifying time relative to the 'start' "
355 "or 'end' makes no sense here: %s",
356 updvals[0]);
357 free(step_start);
358 break;
359 }
361 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
362 } else if (strcmp(updvals[0],"N")==0){
363 current_time = time(NULL);
364 } else {
365 current_time = atol(updvals[0]);
366 }
368 if(current_time <= rrd.live_head->last_up){
369 rrd_set_error("illegal attempt to update using time %ld when "
370 "last update time is %ld (minimum one second step)",
371 current_time, rrd.live_head->last_up);
372 free(step_start);
373 break;
374 }
377 /* seek to the beginning of the rra's */
378 if (rra_current != rra_begin) {
379 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
380 rrd_set_error("seek error in rrd");
381 free(step_start);
382 break;
383 }
384 rra_current = rra_begin;
385 }
386 rra_start = rra_begin;
388 /* when was the current pdp started */
389 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
390 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
392 /* when did the last pdp_st occur */
393 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
394 occu_pdp_st = current_time - occu_pdp_age;
395 interval = current_time - rrd.live_head->last_up;
397 if (occu_pdp_st > proc_pdp_st){
398 /* OK we passed the pdp_st moment*/
399 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
400 * occurred before the latest
401 * pdp_st moment*/
402 post_int = occu_pdp_age; /* how much after it */
403 } else {
404 pre_int = interval;
405 post_int = 0;
406 }
408 #ifdef DEBUG
409 printf(
410 "proc_pdp_age %lu\t"
411 "proc_pdp_st %lu\t"
412 "occu_pfp_age %lu\t"
413 "occu_pdp_st %lu\t"
414 "int %lu\t"
415 "pre_int %lu\t"
416 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
417 occu_pdp_age, occu_pdp_st,
418 interval, pre_int, post_int);
419 #endif
421 /* process the data sources and update the pdp_prep
422 * area accordingly */
423 for(i=0;i<rrd.stat_head->ds_cnt;i++){
424 enum dst_en dst_idx;
425 dst_idx= dst_conv(rrd.ds_def[i].dst);
426 /* NOTE: DST_CDEF should never enter this if block, because
427 * updvals[i+1][0] is initialized to 'U'; unless the caller
428 * accidently specified a value for the DST_CDEF. To handle
429 * this case, an extra check is required. */
430 if((updvals[i+1][0] != 'U') &&
431 (dst_idx != DST_CDEF) &&
432 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
433 double rate = DNAN;
434 /* the data source type defines how to process the data */
435 /* pdp_new contains rate * time ... eg the bytes
436 * transferred during the interval. Doing it this way saves
437 * a lot of math operations */
440 switch(dst_idx){
441 case DST_COUNTER:
442 case DST_DERIVE:
443 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
444 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
445 if(dst_idx == DST_COUNTER) {
446 /* simple overflow catcher sugestet by andres kroonmaa */
447 /* this will fail terribly for non 32 or 64 bit counters ... */
448 /* are there any others in SNMP land ? */
449 if (pdp_new[i] < (double)0.0 )
450 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
451 if (pdp_new[i] < (double)0.0 )
452 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
453 }
454 rate = pdp_new[i] / interval;
455 }
456 else {
457 pdp_new[i]= DNAN;
458 }
459 break;
460 case DST_ABSOLUTE:
461 pdp_new[i]= atof(updvals[i+1]);
462 rate = pdp_new[i] / interval;
463 break;
464 case DST_GAUGE:
465 pdp_new[i] = atof(updvals[i+1]) * interval;
466 rate = pdp_new[i] / interval;
467 break;
468 default:
469 rrd_set_error("rrd contains unknown DS type : '%s'",
470 rrd.ds_def[i].dst);
471 break;
472 }
473 /* break out of this for loop if the error string is set */
474 if (rrd_test_error()){
475 break;
476 }
477 /* make sure pdp_temp is neither too large or too small
478 * if any of these occur it becomes unknown ...
479 * sorry folks ... */
480 if ( ! isnan(rate) &&
481 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
482 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
483 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
484 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
485 pdp_new[i] = DNAN;
486 }
487 } else {
488 /* no news is news all the same */
489 pdp_new[i] = DNAN;
490 }
492 /* make a copy of the command line argument for the next run */
493 #ifdef DEBUG
494 fprintf(stderr,
495 "prep ds[%lu]\t"
496 "last_arg '%s'\t"
497 "this_arg '%s'\t"
498 "pdp_new %10.2f\n",
499 i,
500 rrd.pdp_prep[i].last_ds,
501 updvals[i+1], pdp_new[i]);
502 #endif
503 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
504 strncpy(rrd.pdp_prep[i].last_ds,
505 updvals[i+1],LAST_DS_LEN-1);
506 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
507 }
508 }
509 /* break out of the argument parsing loop if the error_string is set */
510 if (rrd_test_error()){
511 free(step_start);
512 break;
513 }
514 /* has a pdp_st moment occurred since the last run ? */
516 if (proc_pdp_st == occu_pdp_st){
517 /* no we have not passed a pdp_st moment. therefore update is simple */
519 for(i=0;i<rrd.stat_head->ds_cnt;i++){
520 if(isnan(pdp_new[i]))
521 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
522 else
523 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
524 #ifdef DEBUG
525 fprintf(stderr,
526 "NO PDP ds[%lu]\t"
527 "value %10.2f\t"
528 "unkn_sec %5lu\n",
529 i,
530 rrd.pdp_prep[i].scratch[PDP_val].u_val,
531 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
532 #endif
533 }
534 } else {
535 /* an pdp_st has occurred. */
537 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
538 * occurred up to the last run.
539 pdp_new[] contains rate*seconds from the latest run.
540 pdp_temp[] will contain the rate for cdp */
542 for(i=0;i<rrd.stat_head->ds_cnt;i++){
543 /* update pdp_prep to the current pdp_st */
544 if(isnan(pdp_new[i]))
545 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
546 else
547 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
548 pdp_new[i]/(double)interval*(double)pre_int;
550 /* if too much of the pdp_prep is unknown we dump it */
551 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
552 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
553 (occu_pdp_st-proc_pdp_st <=
554 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
555 pdp_temp[i] = DNAN;
556 } else {
557 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
558 / (double)( occu_pdp_st
559 - proc_pdp_st
560 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
561 }
563 /* process CDEF data sources; remember each CDEF DS can
564 * only reference other DS with a lower index number */
565 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
566 rpnp_t *rpnp;
567 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
568 /* substitue data values for OP_VARIABLE nodes */
569 for (ii = 0; rpnp[ii].op != OP_END; ii++)
570 {
571 if (rpnp[ii].op == OP_VARIABLE) {
572 rpnp[ii].op = OP_NUMBER;
573 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
574 }
575 }
576 /* run the rpn calculator */
577 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
578 free(rpnp);
579 break; /* exits the data sources pdp_temp loop */
580 }
581 }
583 /* make pdp_prep ready for the next run */
584 if(isnan(pdp_new[i])){
585 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
586 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
587 } else {
588 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
589 rrd.pdp_prep[i].scratch[PDP_val].u_val =
590 pdp_new[i]/(double)interval*(double)post_int;
591 }
593 #ifdef DEBUG
594 fprintf(stderr,
595 "PDP UPD ds[%lu]\t"
596 "pdp_temp %10.2f\t"
597 "new_prep %10.2f\t"
598 "new_unkn_sec %5lu\n",
599 i, pdp_temp[i],
600 rrd.pdp_prep[i].scratch[PDP_val].u_val,
601 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
602 #endif
603 }
605 /* if there were errors during the last loop, bail out here */
606 if (rrd_test_error()){
607 free(step_start);
608 break;
609 }
611 /* compute the number of elapsed pdp_st moments */
612 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
613 #ifdef DEBUG
614 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
615 #endif
616 if (rra_step_cnt == NULL)
617 {
618 rra_step_cnt = (unsigned long *)
619 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
620 }
622 for(i = 0, rra_start = rra_begin;
623 i < rrd.stat_head->rra_cnt;
624 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
625 i++)
626 {
627 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
628 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
629 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
630 if (start_pdp_offset <= elapsed_pdp_st) {
631 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
632 rrd.rra_def[i].pdp_cnt + 1;
633 } else {
634 rra_step_cnt[i] = 0;
635 }
637 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
638 {
639 /* If this is a bulk update, we need to skip ahead in the seasonal
640 * arrays so that they will be correct for the next observed value;
641 * note that for the bulk update itself, no update will occur to
642 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
643 * be set to DNAN. */
644 if (rra_step_cnt[i] > 2)
645 {
646 /* skip update by resetting rra_step_cnt[i],
647 * note that this is not data source specific; this is due
648 * to the bulk update, not a DNAN value for the specific data
649 * source. */
650 rra_step_cnt[i] = 0;
651 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
652 &last_seasonal_coef);
653 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
654 &seasonal_coef);
655 }
657 /* periodically run a smoother for seasonal effects */
658 /* Need to use first cdp parameter buffer to track
659 * burnin (burnin requires a specific smoothing schedule).
660 * The CDP_init_seasonal parameter is really an RRA level,
661 * not a data source within RRA level parameter, but the rra_def
662 * is read only for rrd_update (not flushed to disk). */
663 iii = i*(rrd.stat_head -> ds_cnt);
664 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
665 <= BURNIN_CYCLES)
666 {
667 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
668 > rrd.rra_def[i].row_cnt - 1) {
669 /* mark off one of the burnin cycles */
670 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
671 schedule_smooth = 1;
672 }
673 } else {
674 /* someone has no doubt invented a trick to deal with this
675 * wrap around, but at least this code is clear. */
676 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
677 rrd.rra_ptr[i].cur_row)
678 {
679 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
680 * mapping between PDP and CDP */
681 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
682 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
683 {
684 #ifdef DEBUG
685 fprintf(stderr,
686 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
687 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
688 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
689 #endif
690 schedule_smooth = 1;
691 }
692 } else {
693 /* can't rely on negative numbers because we are working with
694 * unsigned values */
695 /* Don't need modulus here. If we've wrapped more than once, only
696 * one smooth is executed at the end. */
697 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
698 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
699 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
700 {
701 #ifdef DEBUG
702 fprintf(stderr,
703 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
704 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
705 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
706 #endif
707 schedule_smooth = 1;
708 }
709 }
710 }
712 rra_current = ftell(rrd_file);
713 } /* if cf is DEVSEASONAL or SEASONAL */
715 if (rrd_test_error()) break;
717 /* update CDP_PREP areas */
718 /* loop over data soures within each RRA */
719 for(ii = 0;
720 ii < rrd.stat_head->ds_cnt;
721 ii++)
722 {
724 /* iii indexes the CDP prep area for this data source within the RRA */
725 iii=i*rrd.stat_head->ds_cnt+ii;
727 if (rrd.rra_def[i].pdp_cnt > 1) {
729 if (rra_step_cnt[i] > 0) {
730 /* If we are in this block, as least 1 CDP value will be written to
731 * disk, this is the CDP_primary_val entry. If more than 1 value needs
732 * to be written, then the "fill in" value is the CDP_secondary_val
733 * entry. */
734 if (isnan(pdp_temp[ii]))
735 {
736 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
737 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
738 } else {
739 /* CDP_secondary value is the RRA "fill in" value for intermediary
740 * CDP data entries. No matter the CF, the value is the same because
741 * the average, max, min, and last of a list of identical values is
742 * the same, namely, the value itself. */
743 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
744 }
746 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
747 > rrd.rra_def[i].pdp_cnt*
748 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
749 {
750 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
751 /* initialize carry over */
752 if (current_cf == CF_AVERAGE) {
753 if (isnan(pdp_temp[ii])) {
754 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
755 } else {
756 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
757 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
758 }
759 } else {
760 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
761 }
762 } else {
763 rrd_value_t cum_val, cur_val;
764 switch (current_cf) {
765 case CF_AVERAGE:
766 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
767 cur_val = IFDNAN(pdp_temp[ii],0.0);
768 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
769 (cum_val + cur_val * start_pdp_offset) /
770 (rrd.rra_def[i].pdp_cnt
771 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
772 /* initialize carry over value */
773 if (isnan(pdp_temp[ii])) {
774 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
775 } else {
776 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
777 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
778 }
779 break;
780 case CF_MAXIMUM:
781 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
782 cur_val = IFDNAN(pdp_temp[ii],-DINF);
783 #ifdef DEBUG
784 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
785 isnan(pdp_temp[ii])) {
786 fprintf(stderr,
787 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
788 i,ii);
789 exit(-1);
790 }
791 #endif
792 if (cur_val > cum_val)
793 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
794 else
795 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
796 /* initialize carry over value */
797 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
798 break;
799 case CF_MINIMUM:
800 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
801 cur_val = IFDNAN(pdp_temp[ii],DINF);
802 #ifdef DEBUG
803 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
804 isnan(pdp_temp[ii])) {
805 fprintf(stderr,
806 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
807 i,ii);
808 exit(-1);
809 }
810 #endif
811 if (cur_val < cum_val)
812 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
813 else
814 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
815 /* initialize carry over value */
816 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
817 break;
818 case CF_LAST:
819 default:
820 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
821 /* initialize carry over value */
822 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
823 break;
824 }
825 } /* endif meets xff value requirement for a valid value */
826 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
827 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
828 if (isnan(pdp_temp[ii]))
829 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
830 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
831 else
832 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
833 } else /* rra_step_cnt[i] == 0 */
834 {
835 #ifdef DEBUG
836 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
837 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
838 i,ii);
839 } else {
840 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
841 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
842 }
843 #endif
844 if (isnan(pdp_temp[ii])) {
845 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
846 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
847 {
848 if (current_cf == CF_AVERAGE) {
849 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
850 elapsed_pdp_st;
851 } else {
852 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
853 }
854 #ifdef DEBUG
855 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
856 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
857 #endif
858 } else {
859 switch (current_cf) {
860 case CF_AVERAGE:
861 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
862 elapsed_pdp_st;
863 break;
864 case CF_MINIMUM:
865 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
866 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
867 break;
868 case CF_MAXIMUM:
869 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
870 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
871 break;
872 case CF_LAST:
873 default:
874 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
875 break;
876 }
877 }
878 }
879 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
880 if (elapsed_pdp_st > 2)
881 {
882 switch (current_cf) {
883 case CF_AVERAGE:
884 default:
885 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
886 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
887 break;
888 case CF_SEASONAL:
889 case CF_DEVSEASONAL:
890 /* need to update cached seasonal values, so they are consistent
891 * with the bulk update */
892 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
893 * CDP_last_deviation are the same. */
894 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
895 last_seasonal_coef[ii];
896 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
897 seasonal_coef[ii];
898 break;
899 case CF_HWPREDICT:
900 /* need to update the null_count and last_null_count.
901 * even do this for non-DNAN pdp_temp because the
902 * algorithm is not learning from batch updates. */
903 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
904 elapsed_pdp_st;
905 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
906 elapsed_pdp_st - 1;
907 /* fall through */
908 case CF_DEVPREDICT:
909 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
910 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
911 break;
912 case CF_FAILURES:
913 /* do not count missed bulk values as failures */
914 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
915 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
916 /* need to reset violations buffer.
917 * could do this more carefully, but for now, just
918 * assume a bulk update wipes away all violations. */
919 erase_violations(&rrd, iii, i);
920 break;
921 }
922 }
923 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
925 if (rrd_test_error()) break;
927 } /* endif data sources loop */
928 } /* end RRA Loop */
930 /* this loop is only entered if elapsed_pdp_st < 3 */
931 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
932 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
933 {
934 for(i = 0, rra_start = rra_begin;
935 i < rrd.stat_head->rra_cnt;
936 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
937 i++)
938 {
939 if (rrd.rra_def[i].pdp_cnt > 1) continue;
941 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
942 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
943 {
944 lookup_seasonal(&rrd,i,rra_start,rrd_file,
945 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
946 &seasonal_coef);
947 rra_current = ftell(rrd_file);
948 }
949 if (rrd_test_error()) break;
950 /* loop over data soures within each RRA */
951 for(ii = 0;
952 ii < rrd.stat_head->ds_cnt;
953 ii++)
954 {
955 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
956 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
957 scratch_idx, seasonal_coef);
958 }
959 } /* end RRA Loop */
960 if (rrd_test_error()) break;
961 } /* end elapsed_pdp_st loop */
963 if (rrd_test_error()) break;
965 /* Ready to write to disk */
966 /* Move sequentially through the file, writing one RRA at a time.
967 * Note this architecture divorces the computation of CDP with
968 * flushing updated RRA entries to disk. */
969 for(i = 0, rra_start = rra_begin;
970 i < rrd.stat_head->rra_cnt;
971 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
972 i++) {
973 /* is there anything to write for this RRA? If not, continue. */
974 if (rra_step_cnt[i] == 0) continue;
976 /* write the first row */
977 #ifdef DEBUG
978 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
979 #endif
980 rrd.rra_ptr[i].cur_row++;
981 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
982 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
983 /* positition on the first row */
984 rra_pos_tmp = rra_start +
985 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
986 if(rra_pos_tmp != rra_current) {
987 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
988 rrd_set_error("seek error in rrd");
989 break;
990 }
991 rra_current = rra_pos_tmp;
992 }
994 #ifdef DEBUG
995 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
996 #endif
997 scratch_idx = CDP_primary_val;
998 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
999 if (rrd_test_error()) break;
1001 /* write other rows of the bulk update, if any */
1002 scratch_idx = CDP_secondary_val;
1003 for ( ; rra_step_cnt[i] > 1;
1004 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1005 {
1006 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1007 {
1008 #ifdef DEBUG
1009 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1010 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1011 #endif
1012 /* wrap */
1013 rrd.rra_ptr[i].cur_row = 0;
1014 /* seek back to beginning of current rra */
1015 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1016 {
1017 rrd_set_error("seek error in rrd");
1018 break;
1019 }
1020 #ifdef DEBUG
1021 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1022 #endif
1023 rra_current = rra_start;
1024 }
1025 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1026 }
1028 if (rrd_test_error())
1029 break;
1030 } /* RRA LOOP */
1032 /* break out of the argument parsing loop if error_string is set */
1033 if (rrd_test_error()){
1034 free(step_start);
1035 break;
1036 }
1038 } /* endif a pdp_st has occurred */
1039 rrd.live_head->last_up = current_time;
1040 free(step_start);
1041 } /* function argument loop */
1043 if (seasonal_coef != NULL) free(seasonal_coef);
1044 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1045 if (rra_step_cnt != NULL) free(rra_step_cnt);
1046 rpnstack_free(&rpnstack);
1048 /* if we got here and if there is an error and if the file has not been
1049 * written to, then close things up and return. */
1050 if (rrd_test_error()) {
1051 free(updvals);
1052 free(tmpl_idx);
1053 rrd_free(&rrd);
1054 free(pdp_temp);
1055 free(pdp_new);
1056 fclose(rrd_file);
1057 return(-1);
1058 }
1060 /* aargh ... that was tough ... so many loops ... anyway, its done.
1061 * we just need to write back the live header portion now*/
1063 if (fseek(rrd_file, (sizeof(stat_head_t)
1064 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1065 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1066 SEEK_SET) != 0) {
1067 rrd_set_error("seek rrd for live header writeback");
1068 free(updvals);
1069 free(tmpl_idx);
1070 rrd_free(&rrd);
1071 free(pdp_temp);
1072 free(pdp_new);
1073 fclose(rrd_file);
1074 return(-1);
1075 }
1077 if(fwrite( rrd.live_head,
1078 sizeof(live_head_t), 1, rrd_file) != 1){
1079 rrd_set_error("fwrite live_head to rrd");
1080 free(updvals);
1081 rrd_free(&rrd);
1082 free(tmpl_idx);
1083 free(pdp_temp);
1084 free(pdp_new);
1085 fclose(rrd_file);
1086 return(-1);
1087 }
1089 if(fwrite( rrd.pdp_prep,
1090 sizeof(pdp_prep_t),
1091 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1092 rrd_set_error("ftwrite pdp_prep to rrd");
1093 free(updvals);
1094 rrd_free(&rrd);
1095 free(tmpl_idx);
1096 free(pdp_temp);
1097 free(pdp_new);
1098 fclose(rrd_file);
1099 return(-1);
1100 }
1102 if(fwrite( rrd.cdp_prep,
1103 sizeof(cdp_prep_t),
1104 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1105 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1107 rrd_set_error("ftwrite cdp_prep to rrd");
1108 free(updvals);
1109 free(tmpl_idx);
1110 rrd_free(&rrd);
1111 free(pdp_temp);
1112 free(pdp_new);
1113 fclose(rrd_file);
1114 return(-1);
1115 }
1117 if(fwrite( rrd.rra_ptr,
1118 sizeof(rra_ptr_t),
1119 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1120 rrd_set_error("fwrite rra_ptr to rrd");
1121 free(updvals);
1122 free(tmpl_idx);
1123 rrd_free(&rrd);
1124 free(pdp_temp);
1125 free(pdp_new);
1126 fclose(rrd_file);
1127 return(-1);
1128 }
1130 /* OK now close the files and free the memory */
1131 if(fclose(rrd_file) != 0){
1132 rrd_set_error("closing rrd");
1133 free(updvals);
1134 free(tmpl_idx);
1135 rrd_free(&rrd);
1136 free(pdp_temp);
1137 free(pdp_new);
1138 return(-1);
1139 }
1141 /* calling the smoothing code here guarantees at most
1142 * one smoothing operation per rrd_update call. Unfortunately,
1143 * it is possible with bulk updates, or a long-delayed update
1144 * for smoothing to occur off-schedule. This really isn't
1145 * critical except during the burning cycles. */
1146 if (schedule_smooth)
1147 {
1148 #ifndef WIN32
1149 rrd_file = fopen(argv[optind],"r+");
1150 #else
1151 rrd_file = fopen(argv[optind],"rb+");
1152 #endif
1153 rra_start = rra_begin;
1154 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1155 {
1156 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1157 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1158 {
1159 #ifdef DEBUG
1160 fprintf(stderr,"Running smoother for rra %ld\n",i);
1161 #endif
1162 apply_smoother(&rrd,i,rra_start,rrd_file);
1163 if (rrd_test_error())
1164 break;
1165 }
1166 rra_start += rrd.rra_def[i].row_cnt
1167 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1168 }
1169 fclose(rrd_file);
1170 }
1171 rrd_free(&rrd);
1172 free(updvals);
1173 free(tmpl_idx);
1174 free(pdp_new);
1175 free(pdp_temp);
1176 return(0);
1177 }
1179 /*
1180 * get exclusive lock to whole file.
1181 * lock gets removed when we close the file
1182 *
1183 * returns 0 on success
1184 */
1185 int
1186 LockRRD(FILE *rrdfile)
1187 {
1188 int rrd_fd; /* File descriptor for RRD */
1189 int stat;
1191 rrd_fd = fileno(rrdfile);
1193 {
1194 #ifndef WIN32
1195 struct flock lock;
1196 lock.l_type = F_WRLCK; /* exclusive write lock */
1197 lock.l_len = 0; /* whole file */
1198 lock.l_start = 0; /* start of file */
1199 lock.l_whence = SEEK_SET; /* end of file */
1201 stat = fcntl(rrd_fd, F_SETLK, &lock);
1202 #else
1203 struct _stat st;
1205 if ( _fstat( rrd_fd, &st ) == 0 ) {
1206 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1207 } else {
1208 stat = -1;
1209 }
1210 #endif
1211 }
1213 return(stat);
1214 }
1217 void
1218 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1219 unsigned short CDP_scratch_idx, FILE *rrd_file)
1220 {
1221 unsigned long ds_idx, cdp_idx;
1223 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1224 {
1225 /* compute the cdp index */
1226 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1227 #ifdef DEBUG
1228 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1229 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1230 rrd -> rra_def[rra_idx].cf_nam);
1231 #endif
1233 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1234 sizeof(rrd_value_t),1,rrd_file) != 1)
1235 {
1236 rrd_set_error("writing rrd");
1237 return;
1238 }
1239 *rra_current += sizeof(rrd_value_t);
1240 }
1241 }