1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.6 2002/02/01 20:34:49 oetiker
9 * fixed version number and date/time
10 *
11 * Revision 1.5 2001/05/09 05:31:01 oetiker
12 * Bug fix: when update of multiple PDP/CDP RRAs coincided
13 * with interpolation of multiple PDPs an incorrect value was
14 * stored as the CDP. Especially evident for GAUGE data sources.
15 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
16 *
17 * Revision 1.4 2001/03/10 23:54:41 oetiker
18 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
19 * parser and calculator from rrd_graph and puts then in a new file,
20 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
21 * clean-up of aberrant behavior stuff, including a bug fix.
22 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
23 * -- Jake Brutlag <jakeb@corp.webtv.net>
24 *
25 * Revision 1.3 2001/03/04 13:01:55 oetiker
26 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
27 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
28 * This is backwards compatible! But new files using the Aberrant stuff are not readable
29 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
30 * -- Jake Brutlag <jakeb@corp.webtv.net>
31 *
32 * Revision 1.2 2001/03/04 11:14:25 oetiker
33 * added at-style-time@value:value syntax to rrd_update
34 * -- Dave Bodenstab <imdave@mcs.net>
35 *
36 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
37 * checkin
38 *
39 *****************************************************************************/
41 #include "rrd_tool.h"
42 #include <sys/types.h>
43 #include <fcntl.h>
44 #include "rrd_hw.h"
45 #include "rrd_rpncalc.h"
47 #ifdef WIN32
48 #include <sys/locking.h>
49 #include <sys/stat.h>
50 #include <io.h>
51 #endif
53 /* Local prototypes */
54 int LockRRD(FILE *rrd_file);
55 void write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
56 unsigned short CDP_scratch_idx, FILE *rrd_file);
58 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
61 #ifdef STANDALONE
62 int
63 main(int argc, char **argv){
64 rrd_update(argc,argv);
65 if (rrd_test_error()) {
66 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
67 "Usage: rrdupdate filename\n"
68 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
69 "\t\t\ttime|N:value[:value...]\n\n"
70 "\t\t\tat-time@value[:value...]\n\n"
71 "\t\t\t[ time:value[:value...] ..]\n\n");
73 printf("ERROR: %s\n",rrd_get_error());
74 rrd_clear_error();
75 return 1;
76 }
77 return 0;
78 }
79 #endif
81 int
82 rrd_update(int argc, char **argv)
83 {
85 int arg_i = 2;
86 short j;
87 long i,ii,iii=1;
89 unsigned long rra_begin; /* byte pointer to the rra
90 * area in the rrd file. this
91 * pointer never changes value */
92 unsigned long rra_start; /* byte pointer to the rra
93 * area in the rrd file. this
94 * pointer changes as each rrd is
95 * processed. */
96 unsigned long rra_current; /* byte pointer to the current write
97 * spot in the rrd file. */
98 unsigned long rra_pos_tmp; /* temporary byte pointer. */
99 unsigned long interval,
100 pre_int,post_int; /* interval between this and
101 * the last run */
102 unsigned long proc_pdp_st; /* which pdp_st was the last
103 * to be processed */
104 unsigned long occu_pdp_st; /* when was the pdp_st
105 * before the last update
106 * time */
107 unsigned long proc_pdp_age; /* how old was the data in
108 * the pdp prep area when it
109 * was last updated */
110 unsigned long occu_pdp_age; /* how long ago was the last
111 * pdp_step time */
112 rrd_value_t *pdp_new; /* prepare the incoming data
113 * to be added the the
114 * existing entry */
115 rrd_value_t *pdp_temp; /* prepare the pdp values
116 * to be added the the
117 * cdp values */
119 long *tmpl_idx; /* index representing the settings
120 transported by the template index */
121 long tmpl_cnt = 2; /* time and data */
123 FILE *rrd_file;
124 rrd_t rrd;
125 time_t current_time = time(NULL);
126 char **updvals;
127 int schedule_smooth = 0;
128 char *template = NULL;
129 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
130 /* a vector of future Holt-Winters seasonal coefs */
131 unsigned long elapsed_pdp_st;
132 /* number of elapsed PDP steps since last update */
133 unsigned long *rra_step_cnt = NULL;
134 /* number of rows to be updated in an RRA for a data
135 * value. */
136 unsigned long start_pdp_offset;
137 /* number of PDP steps since the last update that
138 * are assigned to the first CDP to be generated
139 * since the last update. */
140 unsigned short scratch_idx;
141 /* index into the CDP scratch array */
142 enum cf_en current_cf;
143 /* numeric id of the current consolidation function */
144 rpnstack_t rpnstack; /* used for COMPUTE DS */
146 rpnstack_init(&rpnstack);
148 while (1) {
149 static struct option long_options[] =
150 {
151 {"template", required_argument, 0, 't'},
152 {0,0,0,0}
153 };
154 int option_index = 0;
155 int opt;
156 opt = getopt_long(argc, argv, "t:",
157 long_options, &option_index);
159 if (opt == EOF)
160 break;
162 switch(opt) {
163 case 't':
164 template = optarg;
165 break;
167 case '?':
168 rrd_set_error("unknown option '%s'",argv[optind-1]);
169 rrd_free(&rrd);
170 return(-1);
171 }
172 }
174 /* need at least 2 arguments: filename, data. */
175 if (argc-optind < 2) {
176 rrd_set_error("Not enough arguments");
177 return -1;
178 }
180 if(rrd_open(argv[optind],&rrd_file,&rrd, RRD_READWRITE)==-1){
181 return -1;
182 }
183 rra_current = rra_start = rra_begin = ftell(rrd_file);
184 /* This is defined in the ANSI C standard, section 7.9.5.3:
186 When a file is opened with udpate mode ('+' as the second
187 or third character in the ... list of mode argument
188 variables), both input and ouptut may be performed on the
189 associated stream. However, ... input may not be directly
190 followed by output without an intervening call to a file
191 positioning function, unless the input oepration encounters
192 end-of-file. */
193 fseek(rrd_file, 0, SEEK_CUR);
196 /* get exclusive lock to whole file.
197 * lock gets removed when we close the file.
198 */
199 if (LockRRD(rrd_file) != 0) {
200 rrd_set_error("could not lock RRD");
201 rrd_free(&rrd);
202 fclose(rrd_file);
203 return(-1);
204 }
206 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
207 rrd_set_error("allocating updvals pointer array");
208 rrd_free(&rrd);
209 fclose(rrd_file);
210 return(-1);
211 }
213 if ((pdp_temp = malloc(sizeof(rrd_value_t)
214 *rrd.stat_head->ds_cnt))==NULL){
215 rrd_set_error("allocating pdp_temp ...");
216 free(updvals);
217 rrd_free(&rrd);
218 fclose(rrd_file);
219 return(-1);
220 }
222 if ((tmpl_idx = malloc(sizeof(unsigned long)
223 *(rrd.stat_head->ds_cnt+1)))==NULL){
224 rrd_set_error("allocating tmpl_idx ...");
225 free(pdp_temp);
226 free(updvals);
227 rrd_free(&rrd);
228 fclose(rrd_file);
229 return(-1);
230 }
231 /* initialize template redirector */
232 /* default config example (assume DS 1 is a CDEF DS)
233 tmpl_idx[0] -> 0; (time)
234 tmpl_idx[1] -> 1; (DS 0)
235 tmpl_idx[2] -> 3; (DS 2)
236 tmpl_idx[3] -> 4; (DS 3) */
237 tmpl_idx[0] = 0; /* time */
238 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
239 {
240 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
241 tmpl_idx[ii++]=i;
242 }
243 tmpl_cnt= ii;
245 if (template) {
246 char *dsname;
247 int tmpl_len;
248 dsname = template;
249 tmpl_cnt = 1; /* the first entry is the time */
250 tmpl_len = strlen(template);
251 for(i=0;i<=tmpl_len ;i++) {
252 if (template[i] == ':' || template[i] == '\0') {
253 template[i] = '\0';
254 if (tmpl_cnt>rrd.stat_head->ds_cnt){
255 rrd_set_error("Template contains more DS definitions than RRD");
256 free(updvals); free(pdp_temp);
257 free(tmpl_idx); rrd_free(&rrd);
258 fclose(rrd_file); return(-1);
259 }
260 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
261 rrd_set_error("unknown DS name '%s'",dsname);
262 free(updvals); free(pdp_temp);
263 free(tmpl_idx); rrd_free(&rrd);
264 fclose(rrd_file); return(-1);
265 } else {
266 /* the first element is always the time */
267 tmpl_idx[tmpl_cnt-1]++;
268 /* go to the next entry on the template */
269 dsname = &template[i+1];
270 /* fix the damage we did before */
271 if (i<tmpl_len) {
272 template[i]=':';
273 }
275 }
276 }
277 }
278 }
279 if ((pdp_new = malloc(sizeof(rrd_value_t)
280 *rrd.stat_head->ds_cnt))==NULL){
281 rrd_set_error("allocating pdp_new ...");
282 free(updvals);
283 free(pdp_temp);
284 free(tmpl_idx);
285 rrd_free(&rrd);
286 fclose(rrd_file);
287 return(-1);
288 }
290 /* loop through the arguments. */
291 for(arg_i=optind+1; arg_i<argc;arg_i++) {
292 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
293 char *step_start = stepper;
294 char *p;
295 char *parsetime_error = NULL;
296 enum {atstyle, normal} timesyntax;
297 struct time_value ds_tv;
298 if (stepper == NULL){
299 rrd_set_error("failed duplication argv entry");
300 free(updvals);
301 free(pdp_temp);
302 free(tmpl_idx);
303 rrd_free(&rrd);
304 fclose(rrd_file);
305 return(-1);
306 }
307 /* initialize all ds input to unknown except the first one
308 which has always got to be set */
309 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
310 strcpy(stepper,argv[arg_i]);
311 updvals[0]=stepper;
312 /* separate all ds elements; first must be examined separately
313 due to alternate time syntax */
314 if ((p=strchr(stepper,'@'))!=NULL) {
315 timesyntax = atstyle;
316 *p = '\0';
317 stepper = p+1;
318 } else if ((p=strchr(stepper,':'))!=NULL) {
319 timesyntax = normal;
320 *p = '\0';
321 stepper = p+1;
322 } else {
323 rrd_set_error("expected timestamp not found in data source from %s:...",
324 argv[arg_i]);
325 free(step_start);
326 break;
327 }
328 ii=1;
329 updvals[tmpl_idx[ii]] = stepper;
330 while (*stepper) {
331 if (*stepper == ':') {
332 *stepper = '\0';
333 ii++;
334 if (ii<tmpl_cnt){
335 updvals[tmpl_idx[ii]] = stepper+1;
336 }
337 }
338 stepper++;
339 }
341 if (ii != tmpl_cnt-1) {
342 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
343 tmpl_cnt-1, ii, argv[arg_i]);
344 free(step_start);
345 break;
346 }
348 /* get the time from the reading ... handle N */
349 if (timesyntax == atstyle) {
350 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
351 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
352 free(step_start);
353 break;
354 }
355 if (ds_tv.type == RELATIVE_TO_END_TIME ||
356 ds_tv.type == RELATIVE_TO_START_TIME) {
357 rrd_set_error("specifying time relative to the 'start' "
358 "or 'end' makes no sense here: %s",
359 updvals[0]);
360 free(step_start);
361 break;
362 }
364 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
365 } else if (strcmp(updvals[0],"N")==0){
366 current_time = time(NULL);
367 } else {
368 current_time = atol(updvals[0]);
369 }
371 if(current_time <= rrd.live_head->last_up){
372 rrd_set_error("illegal attempt to update using time %ld when "
373 "last update time is %ld (minimum one second step)",
374 current_time, rrd.live_head->last_up);
375 free(step_start);
376 break;
377 }
380 /* seek to the beginning of the rra's */
381 if (rra_current != rra_begin) {
382 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
383 rrd_set_error("seek error in rrd");
384 free(step_start);
385 break;
386 }
387 rra_current = rra_begin;
388 }
389 rra_start = rra_begin;
391 /* when was the current pdp started */
392 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
393 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
395 /* when did the last pdp_st occur */
396 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
397 occu_pdp_st = current_time - occu_pdp_age;
398 interval = current_time - rrd.live_head->last_up;
400 if (occu_pdp_st > proc_pdp_st){
401 /* OK we passed the pdp_st moment*/
402 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
403 * occurred before the latest
404 * pdp_st moment*/
405 post_int = occu_pdp_age; /* how much after it */
406 } else {
407 pre_int = interval;
408 post_int = 0;
409 }
411 #ifdef DEBUG
412 printf(
413 "proc_pdp_age %lu\t"
414 "proc_pdp_st %lu\t"
415 "occu_pfp_age %lu\t"
416 "occu_pdp_st %lu\t"
417 "int %lu\t"
418 "pre_int %lu\t"
419 "post_int %lu\n", proc_pdp_age, proc_pdp_st,
420 occu_pdp_age, occu_pdp_st,
421 interval, pre_int, post_int);
422 #endif
424 /* process the data sources and update the pdp_prep
425 * area accordingly */
426 for(i=0;i<rrd.stat_head->ds_cnt;i++){
427 enum dst_en dst_idx;
428 dst_idx= dst_conv(rrd.ds_def[i].dst);
429 /* NOTE: DST_CDEF should never enter this if block, because
430 * updvals[i+1][0] is initialized to 'U'; unless the caller
431 * accidently specified a value for the DST_CDEF. To handle
432 * this case, an extra check is required. */
433 if((updvals[i+1][0] != 'U') &&
434 (dst_idx != DST_CDEF) &&
435 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
436 double rate = DNAN;
437 /* the data source type defines how to process the data */
438 /* pdp_new contains rate * time ... eg the bytes
439 * transferred during the interval. Doing it this way saves
440 * a lot of math operations */
443 switch(dst_idx){
444 case DST_COUNTER:
445 case DST_DERIVE:
446 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
447 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
448 if(dst_idx == DST_COUNTER) {
449 /* simple overflow catcher sugestet by andres kroonmaa */
450 /* this will fail terribly for non 32 or 64 bit counters ... */
451 /* are there any others in SNMP land ? */
452 if (pdp_new[i] < (double)0.0 )
453 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
454 if (pdp_new[i] < (double)0.0 )
455 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
456 }
457 rate = pdp_new[i] / interval;
458 }
459 else {
460 pdp_new[i]= DNAN;
461 }
462 break;
463 case DST_ABSOLUTE:
464 pdp_new[i]= atof(updvals[i+1]);
465 rate = pdp_new[i] / interval;
466 break;
467 case DST_GAUGE:
468 pdp_new[i] = atof(updvals[i+1]) * interval;
469 rate = pdp_new[i] / interval;
470 break;
471 default:
472 rrd_set_error("rrd contains unknown DS type : '%s'",
473 rrd.ds_def[i].dst);
474 break;
475 }
476 /* break out of this for loop if the error string is set */
477 if (rrd_test_error()){
478 break;
479 }
480 /* make sure pdp_temp is neither too large or too small
481 * if any of these occur it becomes unknown ...
482 * sorry folks ... */
483 if ( ! isnan(rate) &&
484 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
485 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
486 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
487 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
488 pdp_new[i] = DNAN;
489 }
490 } else {
491 /* no news is news all the same */
492 pdp_new[i] = DNAN;
493 }
495 /* make a copy of the command line argument for the next run */
496 #ifdef DEBUG
497 fprintf(stderr,
498 "prep ds[%lu]\t"
499 "last_arg '%s'\t"
500 "this_arg '%s'\t"
501 "pdp_new %10.2f\n",
502 i,
503 rrd.pdp_prep[i].last_ds,
504 updvals[i+1], pdp_new[i]);
505 #endif
506 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
507 strncpy(rrd.pdp_prep[i].last_ds,
508 updvals[i+1],LAST_DS_LEN-1);
509 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
510 }
511 }
512 /* break out of the argument parsing loop if the error_string is set */
513 if (rrd_test_error()){
514 free(step_start);
515 break;
516 }
517 /* has a pdp_st moment occurred since the last run ? */
519 if (proc_pdp_st == occu_pdp_st){
520 /* no we have not passed a pdp_st moment. therefore update is simple */
522 for(i=0;i<rrd.stat_head->ds_cnt;i++){
523 if(isnan(pdp_new[i]))
524 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
525 else
526 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
527 #ifdef DEBUG
528 fprintf(stderr,
529 "NO PDP ds[%lu]\t"
530 "value %10.2f\t"
531 "unkn_sec %5lu\n",
532 i,
533 rrd.pdp_prep[i].scratch[PDP_val].u_val,
534 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
535 #endif
536 }
537 } else {
538 /* an pdp_st has occurred. */
540 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
541 * occurred up to the last run.
542 pdp_new[] contains rate*seconds from the latest run.
543 pdp_temp[] will contain the rate for cdp */
545 for(i=0;i<rrd.stat_head->ds_cnt;i++){
546 /* update pdp_prep to the current pdp_st */
547 if(isnan(pdp_new[i]))
548 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
549 else
550 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
551 pdp_new[i]/(double)interval*(double)pre_int;
553 /* if too much of the pdp_prep is unknown we dump it */
554 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
555 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
556 (occu_pdp_st-proc_pdp_st <=
557 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
558 pdp_temp[i] = DNAN;
559 } else {
560 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
561 / (double)( occu_pdp_st
562 - proc_pdp_st
563 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
564 }
566 /* process CDEF data sources; remember each CDEF DS can
567 * only reference other DS with a lower index number */
568 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
569 rpnp_t *rpnp;
570 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
571 /* substitue data values for OP_VARIABLE nodes */
572 for (ii = 0; rpnp[ii].op != OP_END; ii++)
573 {
574 if (rpnp[ii].op == OP_VARIABLE) {
575 rpnp[ii].op = OP_NUMBER;
576 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
577 }
578 }
579 /* run the rpn calculator */
580 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
581 free(rpnp);
582 break; /* exits the data sources pdp_temp loop */
583 }
584 }
586 /* make pdp_prep ready for the next run */
587 if(isnan(pdp_new[i])){
588 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
589 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
590 } else {
591 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
592 rrd.pdp_prep[i].scratch[PDP_val].u_val =
593 pdp_new[i]/(double)interval*(double)post_int;
594 }
596 #ifdef DEBUG
597 fprintf(stderr,
598 "PDP UPD ds[%lu]\t"
599 "pdp_temp %10.2f\t"
600 "new_prep %10.2f\t"
601 "new_unkn_sec %5lu\n",
602 i, pdp_temp[i],
603 rrd.pdp_prep[i].scratch[PDP_val].u_val,
604 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
605 #endif
606 }
608 /* if there were errors during the last loop, bail out here */
609 if (rrd_test_error()){
610 free(step_start);
611 break;
612 }
614 /* compute the number of elapsed pdp_st moments */
615 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
616 #ifdef DEBUG
617 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
618 #endif
619 if (rra_step_cnt == NULL)
620 {
621 rra_step_cnt = (unsigned long *)
622 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
623 }
625 for(i = 0, rra_start = rra_begin;
626 i < rrd.stat_head->rra_cnt;
627 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
628 i++)
629 {
630 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
631 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
632 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
633 if (start_pdp_offset <= elapsed_pdp_st) {
634 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
635 rrd.rra_def[i].pdp_cnt + 1;
636 } else {
637 rra_step_cnt[i] = 0;
638 }
640 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
641 {
642 /* If this is a bulk update, we need to skip ahead in the seasonal
643 * arrays so that they will be correct for the next observed value;
644 * note that for the bulk update itself, no update will occur to
645 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
646 * be set to DNAN. */
647 if (rra_step_cnt[i] > 2)
648 {
649 /* skip update by resetting rra_step_cnt[i],
650 * note that this is not data source specific; this is due
651 * to the bulk update, not a DNAN value for the specific data
652 * source. */
653 rra_step_cnt[i] = 0;
654 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
655 &last_seasonal_coef);
656 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
657 &seasonal_coef);
658 }
660 /* periodically run a smoother for seasonal effects */
661 /* Need to use first cdp parameter buffer to track
662 * burnin (burnin requires a specific smoothing schedule).
663 * The CDP_init_seasonal parameter is really an RRA level,
664 * not a data source within RRA level parameter, but the rra_def
665 * is read only for rrd_update (not flushed to disk). */
666 iii = i*(rrd.stat_head -> ds_cnt);
667 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
668 <= BURNIN_CYCLES)
669 {
670 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
671 > rrd.rra_def[i].row_cnt - 1) {
672 /* mark off one of the burnin cycles */
673 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
674 schedule_smooth = 1;
675 }
676 } else {
677 /* someone has no doubt invented a trick to deal with this
678 * wrap around, but at least this code is clear. */
679 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
680 rrd.rra_ptr[i].cur_row)
681 {
682 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
683 * mapping between PDP and CDP */
684 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
685 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
686 {
687 #ifdef DEBUG
688 fprintf(stderr,
689 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
690 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
691 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
692 #endif
693 schedule_smooth = 1;
694 }
695 } else {
696 /* can't rely on negative numbers because we are working with
697 * unsigned values */
698 /* Don't need modulus here. If we've wrapped more than once, only
699 * one smooth is executed at the end. */
700 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
701 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
702 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
703 {
704 #ifdef DEBUG
705 fprintf(stderr,
706 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
707 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
708 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
709 #endif
710 schedule_smooth = 1;
711 }
712 }
713 }
715 rra_current = ftell(rrd_file);
716 } /* if cf is DEVSEASONAL or SEASONAL */
718 if (rrd_test_error()) break;
720 /* update CDP_PREP areas */
721 /* loop over data soures within each RRA */
722 for(ii = 0;
723 ii < rrd.stat_head->ds_cnt;
724 ii++)
725 {
727 /* iii indexes the CDP prep area for this data source within the RRA */
728 iii=i*rrd.stat_head->ds_cnt+ii;
730 if (rrd.rra_def[i].pdp_cnt > 1) {
732 if (rra_step_cnt[i] > 0) {
733 /* If we are in this block, as least 1 CDP value will be written to
734 * disk, this is the CDP_primary_val entry. If more than 1 value needs
735 * to be written, then the "fill in" value is the CDP_secondary_val
736 * entry. */
737 if (isnan(pdp_temp[ii]))
738 {
739 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
740 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
741 } else {
742 /* CDP_secondary value is the RRA "fill in" value for intermediary
743 * CDP data entries. No matter the CF, the value is the same because
744 * the average, max, min, and last of a list of identical values is
745 * the same, namely, the value itself. */
746 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
747 }
749 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
750 > rrd.rra_def[i].pdp_cnt*
751 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
752 {
753 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
754 /* initialize carry over */
755 if (current_cf == CF_AVERAGE) {
756 if (isnan(pdp_temp[ii])) {
757 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
758 } else {
759 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
760 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
761 }
762 } else {
763 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
764 }
765 } else {
766 rrd_value_t cum_val, cur_val;
767 switch (current_cf) {
768 case CF_AVERAGE:
769 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
770 cur_val = IFDNAN(pdp_temp[ii],0.0);
771 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
772 (cum_val + cur_val * start_pdp_offset) /
773 (rrd.rra_def[i].pdp_cnt
774 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
775 /* initialize carry over value */
776 if (isnan(pdp_temp[ii])) {
777 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
778 } else {
779 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
780 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
781 }
782 break;
783 case CF_MAXIMUM:
784 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
785 cur_val = IFDNAN(pdp_temp[ii],-DINF);
786 #ifdef DEBUG
787 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
788 isnan(pdp_temp[ii])) {
789 fprintf(stderr,
790 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
791 i,ii);
792 exit(-1);
793 }
794 #endif
795 if (cur_val > cum_val)
796 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
797 else
798 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
799 /* initialize carry over value */
800 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
801 break;
802 case CF_MINIMUM:
803 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
804 cur_val = IFDNAN(pdp_temp[ii],DINF);
805 #ifdef DEBUG
806 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
807 isnan(pdp_temp[ii])) {
808 fprintf(stderr,
809 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
810 i,ii);
811 exit(-1);
812 }
813 #endif
814 if (cur_val < cum_val)
815 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
816 else
817 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
818 /* initialize carry over value */
819 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
820 break;
821 case CF_LAST:
822 default:
823 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
824 /* initialize carry over value */
825 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
826 break;
827 }
828 } /* endif meets xff value requirement for a valid value */
829 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
830 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
831 if (isnan(pdp_temp[ii]))
832 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
833 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
834 else
835 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
836 } else /* rra_step_cnt[i] == 0 */
837 {
838 #ifdef DEBUG
839 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
840 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
841 i,ii);
842 } else {
843 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
844 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
845 }
846 #endif
847 if (isnan(pdp_temp[ii])) {
848 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
849 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
850 {
851 if (current_cf == CF_AVERAGE) {
852 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
853 elapsed_pdp_st;
854 } else {
855 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
856 }
857 #ifdef DEBUG
858 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
859 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
860 #endif
861 } else {
862 switch (current_cf) {
863 case CF_AVERAGE:
864 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
865 elapsed_pdp_st;
866 break;
867 case CF_MINIMUM:
868 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
869 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
870 break;
871 case CF_MAXIMUM:
872 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
873 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
874 break;
875 case CF_LAST:
876 default:
877 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
878 break;
879 }
880 }
881 }
882 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
883 if (elapsed_pdp_st > 2)
884 {
885 switch (current_cf) {
886 case CF_AVERAGE:
887 default:
888 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
889 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
890 break;
891 case CF_SEASONAL:
892 case CF_DEVSEASONAL:
893 /* need to update cached seasonal values, so they are consistent
894 * with the bulk update */
895 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
896 * CDP_last_deviation are the same. */
897 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
898 last_seasonal_coef[ii];
899 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
900 seasonal_coef[ii];
901 break;
902 case CF_HWPREDICT:
903 /* need to update the null_count and last_null_count.
904 * even do this for non-DNAN pdp_temp because the
905 * algorithm is not learning from batch updates. */
906 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
907 elapsed_pdp_st;
908 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
909 elapsed_pdp_st - 1;
910 /* fall through */
911 case CF_DEVPREDICT:
912 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
913 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
914 break;
915 case CF_FAILURES:
916 /* do not count missed bulk values as failures */
917 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
918 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
919 /* need to reset violations buffer.
920 * could do this more carefully, but for now, just
921 * assume a bulk update wipes away all violations. */
922 erase_violations(&rrd, iii, i);
923 break;
924 }
925 }
926 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
928 if (rrd_test_error()) break;
930 } /* endif data sources loop */
931 } /* end RRA Loop */
933 /* this loop is only entered if elapsed_pdp_st < 3 */
934 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
935 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
936 {
937 for(i = 0, rra_start = rra_begin;
938 i < rrd.stat_head->rra_cnt;
939 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
940 i++)
941 {
942 if (rrd.rra_def[i].pdp_cnt > 1) continue;
944 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
945 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
946 {
947 lookup_seasonal(&rrd,i,rra_start,rrd_file,
948 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
949 &seasonal_coef);
950 rra_current = ftell(rrd_file);
951 }
952 if (rrd_test_error()) break;
953 /* loop over data soures within each RRA */
954 for(ii = 0;
955 ii < rrd.stat_head->ds_cnt;
956 ii++)
957 {
958 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
959 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
960 scratch_idx, seasonal_coef);
961 }
962 } /* end RRA Loop */
963 if (rrd_test_error()) break;
964 } /* end elapsed_pdp_st loop */
966 if (rrd_test_error()) break;
968 /* Ready to write to disk */
969 /* Move sequentially through the file, writing one RRA at a time.
970 * Note this architecture divorces the computation of CDP with
971 * flushing updated RRA entries to disk. */
972 for(i = 0, rra_start = rra_begin;
973 i < rrd.stat_head->rra_cnt;
974 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
975 i++) {
976 /* is there anything to write for this RRA? If not, continue. */
977 if (rra_step_cnt[i] == 0) continue;
979 /* write the first row */
980 #ifdef DEBUG
981 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
982 #endif
983 rrd.rra_ptr[i].cur_row++;
984 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
985 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
986 /* positition on the first row */
987 rra_pos_tmp = rra_start +
988 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
989 if(rra_pos_tmp != rra_current) {
990 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
991 rrd_set_error("seek error in rrd");
992 break;
993 }
994 rra_current = rra_pos_tmp;
995 }
997 #ifdef DEBUG
998 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
999 #endif
1000 scratch_idx = CDP_primary_val;
1001 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1002 if (rrd_test_error()) break;
1004 /* write other rows of the bulk update, if any */
1005 scratch_idx = CDP_secondary_val;
1006 for ( ; rra_step_cnt[i] > 1;
1007 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1008 {
1009 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1010 {
1011 #ifdef DEBUG
1012 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1013 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1014 #endif
1015 /* wrap */
1016 rrd.rra_ptr[i].cur_row = 0;
1017 /* seek back to beginning of current rra */
1018 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1019 {
1020 rrd_set_error("seek error in rrd");
1021 break;
1022 }
1023 #ifdef DEBUG
1024 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1025 #endif
1026 rra_current = rra_start;
1027 }
1028 write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file);
1029 }
1031 if (rrd_test_error())
1032 break;
1033 } /* RRA LOOP */
1035 /* break out of the argument parsing loop if error_string is set */
1036 if (rrd_test_error()){
1037 free(step_start);
1038 break;
1039 }
1041 } /* endif a pdp_st has occurred */
1042 rrd.live_head->last_up = current_time;
1043 free(step_start);
1044 } /* function argument loop */
1046 if (seasonal_coef != NULL) free(seasonal_coef);
1047 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1048 if (rra_step_cnt != NULL) free(rra_step_cnt);
1049 rpnstack_free(&rpnstack);
1051 /* if we got here and if there is an error and if the file has not been
1052 * written to, then close things up and return. */
1053 if (rrd_test_error()) {
1054 free(updvals);
1055 free(tmpl_idx);
1056 rrd_free(&rrd);
1057 free(pdp_temp);
1058 free(pdp_new);
1059 fclose(rrd_file);
1060 return(-1);
1061 }
1063 /* aargh ... that was tough ... so many loops ... anyway, its done.
1064 * we just need to write back the live header portion now*/
1066 if (fseek(rrd_file, (sizeof(stat_head_t)
1067 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1068 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1069 SEEK_SET) != 0) {
1070 rrd_set_error("seek rrd for live header writeback");
1071 free(updvals);
1072 free(tmpl_idx);
1073 rrd_free(&rrd);
1074 free(pdp_temp);
1075 free(pdp_new);
1076 fclose(rrd_file);
1077 return(-1);
1078 }
1080 if(fwrite( rrd.live_head,
1081 sizeof(live_head_t), 1, rrd_file) != 1){
1082 rrd_set_error("fwrite live_head to rrd");
1083 free(updvals);
1084 rrd_free(&rrd);
1085 free(tmpl_idx);
1086 free(pdp_temp);
1087 free(pdp_new);
1088 fclose(rrd_file);
1089 return(-1);
1090 }
1092 if(fwrite( rrd.pdp_prep,
1093 sizeof(pdp_prep_t),
1094 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1095 rrd_set_error("ftwrite pdp_prep to rrd");
1096 free(updvals);
1097 rrd_free(&rrd);
1098 free(tmpl_idx);
1099 free(pdp_temp);
1100 free(pdp_new);
1101 fclose(rrd_file);
1102 return(-1);
1103 }
1105 if(fwrite( rrd.cdp_prep,
1106 sizeof(cdp_prep_t),
1107 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1108 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1110 rrd_set_error("ftwrite cdp_prep to rrd");
1111 free(updvals);
1112 free(tmpl_idx);
1113 rrd_free(&rrd);
1114 free(pdp_temp);
1115 free(pdp_new);
1116 fclose(rrd_file);
1117 return(-1);
1118 }
1120 if(fwrite( rrd.rra_ptr,
1121 sizeof(rra_ptr_t),
1122 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1123 rrd_set_error("fwrite rra_ptr to rrd");
1124 free(updvals);
1125 free(tmpl_idx);
1126 rrd_free(&rrd);
1127 free(pdp_temp);
1128 free(pdp_new);
1129 fclose(rrd_file);
1130 return(-1);
1131 }
1133 /* OK now close the files and free the memory */
1134 if(fclose(rrd_file) != 0){
1135 rrd_set_error("closing rrd");
1136 free(updvals);
1137 free(tmpl_idx);
1138 rrd_free(&rrd);
1139 free(pdp_temp);
1140 free(pdp_new);
1141 return(-1);
1142 }
1144 /* calling the smoothing code here guarantees at most
1145 * one smoothing operation per rrd_update call. Unfortunately,
1146 * it is possible with bulk updates, or a long-delayed update
1147 * for smoothing to occur off-schedule. This really isn't
1148 * critical except during the burning cycles. */
1149 if (schedule_smooth)
1150 {
1151 #ifndef WIN32
1152 rrd_file = fopen(argv[optind],"r+");
1153 #else
1154 rrd_file = fopen(argv[optind],"rb+");
1155 #endif
1156 rra_start = rra_begin;
1157 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1158 {
1159 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1160 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1161 {
1162 #ifdef DEBUG
1163 fprintf(stderr,"Running smoother for rra %ld\n",i);
1164 #endif
1165 apply_smoother(&rrd,i,rra_start,rrd_file);
1166 if (rrd_test_error())
1167 break;
1168 }
1169 rra_start += rrd.rra_def[i].row_cnt
1170 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1171 }
1172 fclose(rrd_file);
1173 }
1174 rrd_free(&rrd);
1175 free(updvals);
1176 free(tmpl_idx);
1177 free(pdp_new);
1178 free(pdp_temp);
1179 return(0);
1180 }
1182 /*
1183 * get exclusive lock to whole file.
1184 * lock gets removed when we close the file
1185 *
1186 * returns 0 on success
1187 */
1188 int
1189 LockRRD(FILE *rrdfile)
1190 {
1191 int rrd_fd; /* File descriptor for RRD */
1192 int stat;
1194 rrd_fd = fileno(rrdfile);
1196 {
1197 #ifndef WIN32
1198 struct flock lock;
1199 lock.l_type = F_WRLCK; /* exclusive write lock */
1200 lock.l_len = 0; /* whole file */
1201 lock.l_start = 0; /* start of file */
1202 lock.l_whence = SEEK_SET; /* end of file */
1204 stat = fcntl(rrd_fd, F_SETLK, &lock);
1205 #else
1206 struct _stat st;
1208 if ( _fstat( rrd_fd, &st ) == 0 ) {
1209 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1210 } else {
1211 stat = -1;
1212 }
1213 #endif
1214 }
1216 return(stat);
1217 }
1220 void
1221 write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1222 unsigned short CDP_scratch_idx, FILE *rrd_file)
1223 {
1224 unsigned long ds_idx, cdp_idx;
1226 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1227 {
1228 /* compute the cdp index */
1229 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1230 #ifdef DEBUG
1231 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1232 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1233 rrd -> rra_def[rra_idx].cf_nam);
1234 #endif
1236 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1237 sizeof(rrd_value_t),1,rrd_file) != 1)
1238 {
1239 rrd_set_error("writing rrd");
1240 return;
1241 }
1242 *rra_current += sizeof(rrd_value_t);
1243 }
1244 }