9e8063e35733388295d9223a8df704092a3eccaf
1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.9 2003/04/25 18:35:08 jake
9 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
10 *
11 * Revision 1.8 2003/03/31 21:22:12 oetiker
12 * enables RRDtool updates with microsecond or in case of windows millisecond
13 * precision. This is needed to reduce time measurement error when archive step
14 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
15 *
16 * Revision 1.7 2003/02/13 07:05:27 oetiker
17 * Find attached the patch I promised to send to you. Please note that there
18 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
19 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
20 * library is identical to librrd, but it contains support code for per-thread
21 * global variables currently used for error information only. This is similar
22 * to how errno per-thread variables are implemented. librrd_th must be linked
23 * alongside of libpthred
24 *
25 * There is also a new file "THREADS", holding some documentation.
26 *
27 * -- Peter Stamfest <peter@stamfest.at>
28 *
29 * Revision 1.6 2002/02/01 20:34:49 oetiker
30 * fixed version number and date/time
31 *
32 * Revision 1.5 2001/05/09 05:31:01 oetiker
33 * Bug fix: when update of multiple PDP/CDP RRAs coincided
34 * with interpolation of multiple PDPs an incorrect value was
35 * stored as the CDP. Especially evident for GAUGE data sources.
36 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
37 *
38 * Revision 1.4 2001/03/10 23:54:41 oetiker
39 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
40 * parser and calculator from rrd_graph and puts then in a new file,
41 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
42 * clean-up of aberrant behavior stuff, including a bug fix.
43 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
44 * -- Jake Brutlag <jakeb@corp.webtv.net>
45 *
46 * Revision 1.3 2001/03/04 13:01:55 oetiker
47 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
48 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
49 * This is backwards compatible! But new files using the Aberrant stuff are not readable
50 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
51 * -- Jake Brutlag <jakeb@corp.webtv.net>
52 *
53 * Revision 1.2 2001/03/04 11:14:25 oetiker
54 * added at-style-time@value:value syntax to rrd_update
55 * -- Dave Bodenstab <imdave@mcs.net>
56 *
57 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
58 * checkin
59 *
60 *****************************************************************************/
62 #include "rrd_tool.h"
63 #include <sys/types.h>
64 #include <fcntl.h>
66 #ifdef WIN32
67 #include <sys/locking.h>
68 #include <sys/stat.h>
69 #include <io.h>
70 #endif
72 #include "rrd_hw.h"
73 #include "rrd_rpncalc.h"
75 #include "rrd_is_thread_safe.h"
77 #ifdef WIN32
78 /*
79 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
80 * replacement.
81 */
82 #include <sys/timeb.h>
84 struct timeval {
85 time_t tv_sec; /* seconds */
86 long tv_usec; /* microseconds */
87 };
89 struct __timezone {
90 int tz_minuteswest; /* minutes W of Greenwich */
91 int tz_dsttime; /* type of dst correction */
92 };
94 static gettimeofday(struct timeval *t, struct __timezone *tz) {
96 struct timeb current_time;
98 _ftime(¤t_time);
100 t->tv_sec = current_time.time;
101 t->tv_usec = current_time.millitm * 1000;
102 }
104 #endif
105 /*
106 * normilize time as returned by gettimeofday. usec part must
107 * be always >= 0
108 */
109 static void normalize_time(struct timeval *t)
110 {
111 if(t->tv_usec < 0) {
112 t->tv_sec--;
113 t->tv_usec += 1000000L;
114 }
115 }
117 /* Local prototypes */
118 int LockRRD(FILE *rrd_file);
119 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
120 unsigned long *rra_current,
121 unsigned short CDP_scratch_idx, FILE *rrd_file,
122 info_t *pcdp_summary, time_t *rra_time);
123 int rrd_update_r(char *filename, char *template, int argc, char **argv);
124 int _rrd_update(char *filename, char *template, int argc, char **argv,
125 info_t*);
127 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
130 #ifdef STANDALONE
131 int
132 main(int argc, char **argv){
133 rrd_update(argc,argv);
134 if (rrd_test_error()) {
135 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
136 "Usage: rrdupdate filename\n"
137 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
138 "\t\t\ttime|N:value[:value...]\n\n"
139 "\t\t\tat-time@value[:value...]\n\n"
140 "\t\t\t[ time:value[:value...] ..]\n\n");
142 printf("ERROR: %s\n",rrd_get_error());
143 rrd_clear_error();
144 return 1;
145 }
146 return 0;
147 }
148 #endif
150 info_t *rrd_update_v(int argc, char **argv)
151 {
152 char *template = NULL;
153 info_t *result = NULL;
154 infoval rc;
156 while (1) {
157 static struct option long_options[] =
158 {
159 {"template", required_argument, 0, 't'},
160 {0,0,0,0}
161 };
162 int option_index = 0;
163 int opt;
164 opt = getopt_long(argc, argv, "t:",
165 long_options, &option_index);
167 if (opt == EOF)
168 break;
170 switch(opt) {
171 case 't':
172 template = optarg;
173 break;
175 case '?':
176 rrd_set_error("unknown option '%s'",argv[optind-1]);
177 rc.u_int = -1;
178 goto end_tag;
179 }
180 }
182 /* need at least 2 arguments: filename, data. */
183 if (argc-optind < 2) {
184 rrd_set_error("Not enough arguments");
185 rc.u_int = -1;
186 goto end_tag;
187 }
188 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
189 rc.u_int = _rrd_update(argv[optind], template,
190 argc - optind - 1, argv + optind + 1, result);
191 result->value.u_int = rc.u_int;
192 end_tag:
193 return result;
194 }
196 int
197 rrd_update(int argc, char **argv)
198 {
199 char *template = NULL;
200 int rc;
202 while (1) {
203 static struct option long_options[] =
204 {
205 {"template", required_argument, 0, 't'},
206 {0,0,0,0}
207 };
208 int option_index = 0;
209 int opt;
210 opt = getopt_long(argc, argv, "t:",
211 long_options, &option_index);
213 if (opt == EOF)
214 break;
216 switch(opt) {
217 case 't':
218 template = optarg;
219 break;
221 case '?':
222 rrd_set_error("unknown option '%s'",argv[optind-1]);
223 return(-1);
224 }
225 }
227 /* need at least 2 arguments: filename, data. */
228 if (argc-optind < 2) {
229 rrd_set_error("Not enough arguments");
231 return -1;
232 }
234 rc = rrd_update_r(argv[optind], template,
235 argc - optind - 1, argv + optind + 1);
236 return rc;
237 }
239 int
240 rrd_update_r(char *filename, char *template, int argc, char **argv)
241 {
242 return _rrd_update(filename, template, argc, argv, NULL);
243 }
245 int
246 _rrd_update(char *filename, char *template, int argc, char **argv,
247 info_t *pcdp_summary)
248 {
250 int arg_i = 2;
251 short j;
252 unsigned long i,ii,iii=1;
254 unsigned long rra_begin; /* byte pointer to the rra
255 * area in the rrd file. this
256 * pointer never changes value */
257 unsigned long rra_start; /* byte pointer to the rra
258 * area in the rrd file. this
259 * pointer changes as each rrd is
260 * processed. */
261 unsigned long rra_current; /* byte pointer to the current write
262 * spot in the rrd file. */
263 unsigned long rra_pos_tmp; /* temporary byte pointer. */
264 double interval,
265 pre_int,post_int; /* interval between this and
266 * the last run */
267 unsigned long proc_pdp_st; /* which pdp_st was the last
268 * to be processed */
269 unsigned long occu_pdp_st; /* when was the pdp_st
270 * before the last update
271 * time */
272 unsigned long proc_pdp_age; /* how old was the data in
273 * the pdp prep area when it
274 * was last updated */
275 unsigned long occu_pdp_age; /* how long ago was the last
276 * pdp_step time */
277 rrd_value_t *pdp_new; /* prepare the incoming data
278 * to be added the the
279 * existing entry */
280 rrd_value_t *pdp_temp; /* prepare the pdp values
281 * to be added the the
282 * cdp values */
284 long *tmpl_idx; /* index representing the settings
285 transported by the template index */
286 unsigned long tmpl_cnt = 2; /* time and data */
288 FILE *rrd_file;
289 rrd_t rrd;
290 time_t current_time;
291 time_t rra_time; /* time of update for a RRA */
292 unsigned long current_time_usec; /* microseconds part of current time */
293 struct timeval tmp_time; /* used for time conversion */
295 char **updvals;
296 int schedule_smooth = 0;
297 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
298 /* a vector of future Holt-Winters seasonal coefs */
299 unsigned long elapsed_pdp_st;
300 /* number of elapsed PDP steps since last update */
301 unsigned long *rra_step_cnt = NULL;
302 /* number of rows to be updated in an RRA for a data
303 * value. */
304 unsigned long start_pdp_offset;
305 /* number of PDP steps since the last update that
306 * are assigned to the first CDP to be generated
307 * since the last update. */
308 unsigned short scratch_idx;
309 /* index into the CDP scratch array */
310 enum cf_en current_cf;
311 /* numeric id of the current consolidation function */
312 rpnstack_t rpnstack; /* used for COMPUTE DS */
313 int version; /* rrd version */
315 rpnstack_init(&rpnstack);
317 /* need at least 1 arguments: data. */
318 if (argc < 1) {
319 rrd_set_error("Not enough arguments");
320 return -1;
321 }
325 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
326 return -1;
327 }
328 /* initialize time */
329 version = atoi(rrd.stat_head->version);
330 gettimeofday(&tmp_time, 0);
331 normalize_time(&tmp_time);
332 current_time = tmp_time.tv_sec;
333 if(version >= 3) {
334 current_time_usec = tmp_time.tv_usec;
335 }
336 else {
337 current_time_usec = 0;
338 }
340 rra_current = rra_start = rra_begin = ftell(rrd_file);
341 /* This is defined in the ANSI C standard, section 7.9.5.3:
343 When a file is opened with udpate mode ('+' as the second
344 or third character in the ... list of mode argument
345 variables), both input and ouptut may be performed on the
346 associated stream. However, ... input may not be directly
347 followed by output without an intervening call to a file
348 positioning function, unless the input oepration encounters
349 end-of-file. */
350 fseek(rrd_file, 0, SEEK_CUR);
353 /* get exclusive lock to whole file.
354 * lock gets removed when we close the file.
355 */
356 if (LockRRD(rrd_file) != 0) {
357 rrd_set_error("could not lock RRD");
358 rrd_free(&rrd);
359 fclose(rrd_file);
360 return(-1);
361 }
363 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
364 rrd_set_error("allocating updvals pointer array");
365 rrd_free(&rrd);
366 fclose(rrd_file);
367 return(-1);
368 }
370 if ((pdp_temp = malloc(sizeof(rrd_value_t)
371 *rrd.stat_head->ds_cnt))==NULL){
372 rrd_set_error("allocating pdp_temp ...");
373 free(updvals);
374 rrd_free(&rrd);
375 fclose(rrd_file);
376 return(-1);
377 }
379 if ((tmpl_idx = malloc(sizeof(unsigned long)
380 *(rrd.stat_head->ds_cnt+1)))==NULL){
381 rrd_set_error("allocating tmpl_idx ...");
382 free(pdp_temp);
383 free(updvals);
384 rrd_free(&rrd);
385 fclose(rrd_file);
386 return(-1);
387 }
388 /* initialize template redirector */
389 /* default config example (assume DS 1 is a CDEF DS)
390 tmpl_idx[0] -> 0; (time)
391 tmpl_idx[1] -> 1; (DS 0)
392 tmpl_idx[2] -> 3; (DS 2)
393 tmpl_idx[3] -> 4; (DS 3) */
394 tmpl_idx[0] = 0; /* time */
395 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
396 {
397 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
398 tmpl_idx[ii++]=i;
399 }
400 tmpl_cnt= ii;
402 if (template) {
403 char *dsname;
404 unsigned int tmpl_len;
405 dsname = template;
406 tmpl_cnt = 1; /* the first entry is the time */
407 tmpl_len = strlen(template);
408 for(i=0;i<=tmpl_len ;i++) {
409 if (template[i] == ':' || template[i] == '\0') {
410 template[i] = '\0';
411 if (tmpl_cnt>rrd.stat_head->ds_cnt){
412 rrd_set_error("Template contains more DS definitions than RRD");
413 free(updvals); free(pdp_temp);
414 free(tmpl_idx); rrd_free(&rrd);
415 fclose(rrd_file); return(-1);
416 }
417 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
418 rrd_set_error("unknown DS name '%s'",dsname);
419 free(updvals); free(pdp_temp);
420 free(tmpl_idx); rrd_free(&rrd);
421 fclose(rrd_file); return(-1);
422 } else {
423 /* the first element is always the time */
424 tmpl_idx[tmpl_cnt-1]++;
425 /* go to the next entry on the template */
426 dsname = &template[i+1];
427 /* fix the damage we did before */
428 if (i<tmpl_len) {
429 template[i]=':';
430 }
432 }
433 }
434 }
435 }
436 if ((pdp_new = malloc(sizeof(rrd_value_t)
437 *rrd.stat_head->ds_cnt))==NULL){
438 rrd_set_error("allocating pdp_new ...");
439 free(updvals);
440 free(pdp_temp);
441 free(tmpl_idx);
442 rrd_free(&rrd);
443 fclose(rrd_file);
444 return(-1);
445 }
447 /* loop through the arguments. */
448 for(arg_i=0; arg_i<argc;arg_i++) {
449 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
450 char *step_start = stepper;
451 char *p;
452 char *parsetime_error = NULL;
453 enum {atstyle, normal} timesyntax;
454 struct time_value ds_tv;
455 if (stepper == NULL){
456 rrd_set_error("failed duplication argv entry");
457 free(updvals);
458 free(pdp_temp);
459 free(tmpl_idx);
460 rrd_free(&rrd);
461 fclose(rrd_file);
462 return(-1);
463 }
464 /* initialize all ds input to unknown except the first one
465 which has always got to be set */
466 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
467 strcpy(stepper,argv[arg_i]);
468 updvals[0]=stepper;
469 /* separate all ds elements; first must be examined separately
470 due to alternate time syntax */
471 if ((p=strchr(stepper,'@'))!=NULL) {
472 timesyntax = atstyle;
473 *p = '\0';
474 stepper = p+1;
475 } else if ((p=strchr(stepper,':'))!=NULL) {
476 timesyntax = normal;
477 *p = '\0';
478 stepper = p+1;
479 } else {
480 rrd_set_error("expected timestamp not found in data source from %s:...",
481 argv[arg_i]);
482 free(step_start);
483 break;
484 }
485 ii=1;
486 updvals[tmpl_idx[ii]] = stepper;
487 while (*stepper) {
488 if (*stepper == ':') {
489 *stepper = '\0';
490 ii++;
491 if (ii<tmpl_cnt){
492 updvals[tmpl_idx[ii]] = stepper+1;
493 }
494 }
495 stepper++;
496 }
498 if (ii != tmpl_cnt-1) {
499 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
500 tmpl_cnt-1, ii, argv[arg_i]);
501 free(step_start);
502 break;
503 }
505 /* get the time from the reading ... handle N */
506 if (timesyntax == atstyle) {
507 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
508 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
509 free(step_start);
510 break;
511 }
512 if (ds_tv.type == RELATIVE_TO_END_TIME ||
513 ds_tv.type == RELATIVE_TO_START_TIME) {
514 rrd_set_error("specifying time relative to the 'start' "
515 "or 'end' makes no sense here: %s",
516 updvals[0]);
517 free(step_start);
518 break;
519 }
521 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
522 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
524 } else if (strcmp(updvals[0],"N")==0){
525 gettimeofday(&tmp_time, 0);
526 normalize_time(&tmp_time);
527 current_time = tmp_time.tv_sec;
528 current_time_usec = tmp_time.tv_usec;
529 } else {
530 double tmp;
531 tmp = strtod(updvals[0], 0);
532 current_time = floor(tmp);
533 current_time_usec = (long)((tmp - current_time) * 1000000L);
534 }
535 /* dont do any correction for old version RRDs */
536 if(version < 3)
537 current_time_usec = 0;
539 if(current_time <= rrd.live_head->last_up){
540 rrd_set_error("illegal attempt to update using time %ld when "
541 "last update time is %ld (minimum one second step)",
542 current_time, rrd.live_head->last_up);
543 free(step_start);
544 break;
545 }
548 /* seek to the beginning of the rra's */
549 if (rra_current != rra_begin) {
550 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
551 rrd_set_error("seek error in rrd");
552 free(step_start);
553 break;
554 }
555 rra_current = rra_begin;
556 }
557 rra_start = rra_begin;
559 /* when was the current pdp started */
560 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
561 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
563 /* when did the last pdp_st occur */
564 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
565 occu_pdp_st = current_time - occu_pdp_age;
566 /* interval = current_time - rrd.live_head->last_up; */
567 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
569 if (occu_pdp_st > proc_pdp_st){
570 /* OK we passed the pdp_st moment*/
571 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
572 * occurred before the latest
573 * pdp_st moment*/
574 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
575 post_int = occu_pdp_age; /* how much after it */
576 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
577 } else {
578 pre_int = interval;
579 post_int = 0;
580 }
582 #ifdef DEBUG
583 printf(
584 "proc_pdp_age %lu\t"
585 "proc_pdp_st %lu\t"
586 "occu_pfp_age %lu\t"
587 "occu_pdp_st %lu\t"
588 "int %lf\t"
589 "pre_int %lf\t"
590 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
591 occu_pdp_age, occu_pdp_st,
592 interval, pre_int, post_int);
593 #endif
595 /* process the data sources and update the pdp_prep
596 * area accordingly */
597 for(i=0;i<rrd.stat_head->ds_cnt;i++){
598 enum dst_en dst_idx;
599 dst_idx= dst_conv(rrd.ds_def[i].dst);
600 /* NOTE: DST_CDEF should never enter this if block, because
601 * updvals[i+1][0] is initialized to 'U'; unless the caller
602 * accidently specified a value for the DST_CDEF. To handle
603 * this case, an extra check is required. */
604 if((updvals[i+1][0] != 'U') &&
605 (dst_idx != DST_CDEF) &&
606 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
607 double rate = DNAN;
608 /* the data source type defines how to process the data */
609 /* pdp_new contains rate * time ... eg the bytes
610 * transferred during the interval. Doing it this way saves
611 * a lot of math operations */
614 switch(dst_idx){
615 case DST_COUNTER:
616 case DST_DERIVE:
617 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
618 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
619 if(dst_idx == DST_COUNTER) {
620 /* simple overflow catcher sugestet by andres kroonmaa */
621 /* this will fail terribly for non 32 or 64 bit counters ... */
622 /* are there any others in SNMP land ? */
623 if (pdp_new[i] < (double)0.0 )
624 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
625 if (pdp_new[i] < (double)0.0 )
626 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
627 }
628 rate = pdp_new[i] / interval;
629 }
630 else {
631 pdp_new[i]= DNAN;
632 }
633 break;
634 case DST_ABSOLUTE:
635 pdp_new[i]= atof(updvals[i+1]);
636 rate = pdp_new[i] / interval;
637 break;
638 case DST_GAUGE:
639 pdp_new[i] = atof(updvals[i+1]) * interval;
640 rate = pdp_new[i] / interval;
641 break;
642 default:
643 rrd_set_error("rrd contains unknown DS type : '%s'",
644 rrd.ds_def[i].dst);
645 break;
646 }
647 /* break out of this for loop if the error string is set */
648 if (rrd_test_error()){
649 break;
650 }
651 /* make sure pdp_temp is neither too large or too small
652 * if any of these occur it becomes unknown ...
653 * sorry folks ... */
654 if ( ! isnan(rate) &&
655 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
656 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
657 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
658 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
659 pdp_new[i] = DNAN;
660 }
661 } else {
662 /* no news is news all the same */
663 pdp_new[i] = DNAN;
664 }
666 /* make a copy of the command line argument for the next run */
667 #ifdef DEBUG
668 fprintf(stderr,
669 "prep ds[%lu]\t"
670 "last_arg '%s'\t"
671 "this_arg '%s'\t"
672 "pdp_new %10.2f\n",
673 i,
674 rrd.pdp_prep[i].last_ds,
675 updvals[i+1], pdp_new[i]);
676 #endif
677 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
678 strncpy(rrd.pdp_prep[i].last_ds,
679 updvals[i+1],LAST_DS_LEN-1);
680 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
681 }
682 }
683 /* break out of the argument parsing loop if the error_string is set */
684 if (rrd_test_error()){
685 free(step_start);
686 break;
687 }
688 /* has a pdp_st moment occurred since the last run ? */
690 if (proc_pdp_st == occu_pdp_st){
691 /* no we have not passed a pdp_st moment. therefore update is simple */
693 for(i=0;i<rrd.stat_head->ds_cnt;i++){
694 if(isnan(pdp_new[i]))
695 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
696 else
697 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
698 #ifdef DEBUG
699 fprintf(stderr,
700 "NO PDP ds[%lu]\t"
701 "value %10.2f\t"
702 "unkn_sec %5lu\n",
703 i,
704 rrd.pdp_prep[i].scratch[PDP_val].u_val,
705 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
706 #endif
707 }
708 } else {
709 /* an pdp_st has occurred. */
711 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
712 * occurred up to the last run.
713 pdp_new[] contains rate*seconds from the latest run.
714 pdp_temp[] will contain the rate for cdp */
716 for(i=0;i<rrd.stat_head->ds_cnt;i++){
717 /* update pdp_prep to the current pdp_st */
718 if(isnan(pdp_new[i]))
719 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
720 else
721 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
722 pdp_new[i]/(double)interval*(double)pre_int;
724 /* if too much of the pdp_prep is unknown we dump it */
725 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
726 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
727 (occu_pdp_st-proc_pdp_st <=
728 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
729 pdp_temp[i] = DNAN;
730 } else {
731 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
732 / (double)( occu_pdp_st
733 - proc_pdp_st
734 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
735 }
737 /* process CDEF data sources; remember each CDEF DS can
738 * only reference other DS with a lower index number */
739 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
740 rpnp_t *rpnp;
741 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
742 /* substitue data values for OP_VARIABLE nodes */
743 for (ii = 0; rpnp[ii].op != OP_END; ii++)
744 {
745 if (rpnp[ii].op == OP_VARIABLE) {
746 rpnp[ii].op = OP_NUMBER;
747 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
748 }
749 }
750 /* run the rpn calculator */
751 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
752 free(rpnp);
753 break; /* exits the data sources pdp_temp loop */
754 }
755 }
757 /* make pdp_prep ready for the next run */
758 if(isnan(pdp_new[i])){
759 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
760 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
761 } else {
762 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
763 rrd.pdp_prep[i].scratch[PDP_val].u_val =
764 pdp_new[i]/(double)interval*(double)post_int;
765 }
767 #ifdef DEBUG
768 fprintf(stderr,
769 "PDP UPD ds[%lu]\t"
770 "pdp_temp %10.2f\t"
771 "new_prep %10.2f\t"
772 "new_unkn_sec %5lu\n",
773 i, pdp_temp[i],
774 rrd.pdp_prep[i].scratch[PDP_val].u_val,
775 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
776 #endif
777 }
779 /* if there were errors during the last loop, bail out here */
780 if (rrd_test_error()){
781 free(step_start);
782 break;
783 }
785 /* compute the number of elapsed pdp_st moments */
786 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
787 #ifdef DEBUG
788 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
789 #endif
790 if (rra_step_cnt == NULL)
791 {
792 rra_step_cnt = (unsigned long *)
793 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
794 }
796 for(i = 0, rra_start = rra_begin;
797 i < rrd.stat_head->rra_cnt;
798 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
799 i++)
800 {
801 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
802 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
803 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
804 if (start_pdp_offset <= elapsed_pdp_st) {
805 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
806 rrd.rra_def[i].pdp_cnt + 1;
807 } else {
808 rra_step_cnt[i] = 0;
809 }
811 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
812 {
813 /* If this is a bulk update, we need to skip ahead in the seasonal
814 * arrays so that they will be correct for the next observed value;
815 * note that for the bulk update itself, no update will occur to
816 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
817 * be set to DNAN. */
818 if (rra_step_cnt[i] > 2)
819 {
820 /* skip update by resetting rra_step_cnt[i],
821 * note that this is not data source specific; this is due
822 * to the bulk update, not a DNAN value for the specific data
823 * source. */
824 rra_step_cnt[i] = 0;
825 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
826 &last_seasonal_coef);
827 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
828 &seasonal_coef);
829 }
831 /* periodically run a smoother for seasonal effects */
832 /* Need to use first cdp parameter buffer to track
833 * burnin (burnin requires a specific smoothing schedule).
834 * The CDP_init_seasonal parameter is really an RRA level,
835 * not a data source within RRA level parameter, but the rra_def
836 * is read only for rrd_update (not flushed to disk). */
837 iii = i*(rrd.stat_head -> ds_cnt);
838 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
839 <= BURNIN_CYCLES)
840 {
841 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
842 > rrd.rra_def[i].row_cnt - 1) {
843 /* mark off one of the burnin cycles */
844 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
845 schedule_smooth = 1;
846 }
847 } else {
848 /* someone has no doubt invented a trick to deal with this
849 * wrap around, but at least this code is clear. */
850 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
851 rrd.rra_ptr[i].cur_row)
852 {
853 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
854 * mapping between PDP and CDP */
855 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
856 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
857 {
858 #ifdef DEBUG
859 fprintf(stderr,
860 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
861 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
862 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
863 #endif
864 schedule_smooth = 1;
865 }
866 } else {
867 /* can't rely on negative numbers because we are working with
868 * unsigned values */
869 /* Don't need modulus here. If we've wrapped more than once, only
870 * one smooth is executed at the end. */
871 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
872 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
873 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
874 {
875 #ifdef DEBUG
876 fprintf(stderr,
877 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
878 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
879 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
880 #endif
881 schedule_smooth = 1;
882 }
883 }
884 }
886 rra_current = ftell(rrd_file);
887 } /* if cf is DEVSEASONAL or SEASONAL */
889 if (rrd_test_error()) break;
891 /* update CDP_PREP areas */
892 /* loop over data soures within each RRA */
893 for(ii = 0;
894 ii < rrd.stat_head->ds_cnt;
895 ii++)
896 {
898 /* iii indexes the CDP prep area for this data source within the RRA */
899 iii=i*rrd.stat_head->ds_cnt+ii;
901 if (rrd.rra_def[i].pdp_cnt > 1) {
903 if (rra_step_cnt[i] > 0) {
904 /* If we are in this block, as least 1 CDP value will be written to
905 * disk, this is the CDP_primary_val entry. If more than 1 value needs
906 * to be written, then the "fill in" value is the CDP_secondary_val
907 * entry. */
908 if (isnan(pdp_temp[ii]))
909 {
910 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
911 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
912 } else {
913 /* CDP_secondary value is the RRA "fill in" value for intermediary
914 * CDP data entries. No matter the CF, the value is the same because
915 * the average, max, min, and last of a list of identical values is
916 * the same, namely, the value itself. */
917 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
918 }
920 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
921 > rrd.rra_def[i].pdp_cnt*
922 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
923 {
924 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
925 /* initialize carry over */
926 if (current_cf == CF_AVERAGE) {
927 if (isnan(pdp_temp[ii])) {
928 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
929 } else {
930 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
931 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
932 }
933 } else {
934 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
935 }
936 } else {
937 rrd_value_t cum_val, cur_val;
938 switch (current_cf) {
939 case CF_AVERAGE:
940 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
941 cur_val = IFDNAN(pdp_temp[ii],0.0);
942 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
943 (cum_val + cur_val * start_pdp_offset) /
944 (rrd.rra_def[i].pdp_cnt
945 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
946 /* initialize carry over value */
947 if (isnan(pdp_temp[ii])) {
948 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
949 } else {
950 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
951 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
952 }
953 break;
954 case CF_MAXIMUM:
955 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
956 cur_val = IFDNAN(pdp_temp[ii],-DINF);
957 #ifdef DEBUG
958 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
959 isnan(pdp_temp[ii])) {
960 fprintf(stderr,
961 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
962 i,ii);
963 exit(-1);
964 }
965 #endif
966 if (cur_val > cum_val)
967 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
968 else
969 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
970 /* initialize carry over value */
971 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
972 break;
973 case CF_MINIMUM:
974 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
975 cur_val = IFDNAN(pdp_temp[ii],DINF);
976 #ifdef DEBUG
977 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
978 isnan(pdp_temp[ii])) {
979 fprintf(stderr,
980 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
981 i,ii);
982 exit(-1);
983 }
984 #endif
985 if (cur_val < cum_val)
986 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
987 else
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
989 /* initialize carry over value */
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991 break;
992 case CF_LAST:
993 default:
994 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
995 /* initialize carry over value */
996 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
997 break;
998 }
999 } /* endif meets xff value requirement for a valid value */
1000 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1001 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1002 if (isnan(pdp_temp[ii]))
1003 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1004 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1005 else
1006 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1007 } else /* rra_step_cnt[i] == 0 */
1008 {
1009 #ifdef DEBUG
1010 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1011 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1012 i,ii);
1013 } else {
1014 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1015 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1016 }
1017 #endif
1018 if (isnan(pdp_temp[ii])) {
1019 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1020 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1021 {
1022 if (current_cf == CF_AVERAGE) {
1023 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1024 elapsed_pdp_st;
1025 } else {
1026 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1027 }
1028 #ifdef DEBUG
1029 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1030 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1031 #endif
1032 } else {
1033 switch (current_cf) {
1034 case CF_AVERAGE:
1035 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1036 elapsed_pdp_st;
1037 break;
1038 case CF_MINIMUM:
1039 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1040 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1041 break;
1042 case CF_MAXIMUM:
1043 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045 break;
1046 case CF_LAST:
1047 default:
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049 break;
1050 }
1051 }
1052 }
1053 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1054 if (elapsed_pdp_st > 2)
1055 {
1056 switch (current_cf) {
1057 case CF_AVERAGE:
1058 default:
1059 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1060 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1061 break;
1062 case CF_SEASONAL:
1063 case CF_DEVSEASONAL:
1064 /* need to update cached seasonal values, so they are consistent
1065 * with the bulk update */
1066 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1067 * CDP_last_deviation are the same. */
1068 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1069 last_seasonal_coef[ii];
1070 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1071 seasonal_coef[ii];
1072 break;
1073 case CF_HWPREDICT:
1074 /* need to update the null_count and last_null_count.
1075 * even do this for non-DNAN pdp_temp because the
1076 * algorithm is not learning from batch updates. */
1077 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1078 elapsed_pdp_st;
1079 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1080 elapsed_pdp_st - 1;
1081 /* fall through */
1082 case CF_DEVPREDICT:
1083 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1084 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1085 break;
1086 case CF_FAILURES:
1087 /* do not count missed bulk values as failures */
1088 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1089 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1090 /* need to reset violations buffer.
1091 * could do this more carefully, but for now, just
1092 * assume a bulk update wipes away all violations. */
1093 erase_violations(&rrd, iii, i);
1094 break;
1095 }
1096 }
1097 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1099 if (rrd_test_error()) break;
1101 } /* endif data sources loop */
1102 } /* end RRA Loop */
1104 /* this loop is only entered if elapsed_pdp_st < 3 */
1105 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1106 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1107 {
1108 for(i = 0, rra_start = rra_begin;
1109 i < rrd.stat_head->rra_cnt;
1110 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1111 i++)
1112 {
1113 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1115 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1116 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1117 {
1118 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1119 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1120 &seasonal_coef);
1121 rra_current = ftell(rrd_file);
1122 }
1123 if (rrd_test_error()) break;
1124 /* loop over data soures within each RRA */
1125 for(ii = 0;
1126 ii < rrd.stat_head->ds_cnt;
1127 ii++)
1128 {
1129 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1130 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1131 scratch_idx, seasonal_coef);
1132 }
1133 } /* end RRA Loop */
1134 if (rrd_test_error()) break;
1135 } /* end elapsed_pdp_st loop */
1137 if (rrd_test_error()) break;
1139 /* Ready to write to disk */
1140 /* Move sequentially through the file, writing one RRA at a time.
1141 * Note this architecture divorces the computation of CDP with
1142 * flushing updated RRA entries to disk. */
1143 for(i = 0, rra_start = rra_begin;
1144 i < rrd.stat_head->rra_cnt;
1145 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1146 i++) {
1147 /* is there anything to write for this RRA? If not, continue. */
1148 if (rra_step_cnt[i] == 0) continue;
1150 /* write the first row */
1151 #ifdef DEBUG
1152 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1153 #endif
1154 rrd.rra_ptr[i].cur_row++;
1155 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1156 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1157 /* positition on the first row */
1158 rra_pos_tmp = rra_start +
1159 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1160 if(rra_pos_tmp != rra_current) {
1161 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1162 rrd_set_error("seek error in rrd");
1163 break;
1164 }
1165 rra_current = rra_pos_tmp;
1166 }
1168 #ifdef DEBUG
1169 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1170 #endif
1171 scratch_idx = CDP_primary_val;
1172 if (pcdp_summary != NULL)
1173 {
1174 rra_time = (current_time - current_time
1175 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1176 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1177 }
1178 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1179 pcdp_summary, &rra_time);
1180 if (rrd_test_error()) break;
1182 /* write other rows of the bulk update, if any */
1183 scratch_idx = CDP_secondary_val;
1184 for ( ; rra_step_cnt[i] > 1;
1185 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1186 {
1187 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1188 {
1189 #ifdef DEBUG
1190 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1191 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1192 #endif
1193 /* wrap */
1194 rrd.rra_ptr[i].cur_row = 0;
1195 /* seek back to beginning of current rra */
1196 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1197 {
1198 rrd_set_error("seek error in rrd");
1199 break;
1200 }
1201 #ifdef DEBUG
1202 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1203 #endif
1204 rra_current = rra_start;
1205 }
1206 if (pcdp_summary != NULL)
1207 {
1208 rra_time = (current_time - current_time
1209 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1210 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1211 }
1212 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1213 pcdp_summary, &rra_time);
1214 }
1216 if (rrd_test_error())
1217 break;
1218 } /* RRA LOOP */
1220 /* break out of the argument parsing loop if error_string is set */
1221 if (rrd_test_error()){
1222 free(step_start);
1223 break;
1224 }
1226 } /* endif a pdp_st has occurred */
1227 rrd.live_head->last_up = current_time;
1228 rrd.live_head->last_up_usec = current_time_usec;
1229 free(step_start);
1230 } /* function argument loop */
1232 if (seasonal_coef != NULL) free(seasonal_coef);
1233 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1234 if (rra_step_cnt != NULL) free(rra_step_cnt);
1235 rpnstack_free(&rpnstack);
1237 /* if we got here and if there is an error and if the file has not been
1238 * written to, then close things up and return. */
1239 if (rrd_test_error()) {
1240 free(updvals);
1241 free(tmpl_idx);
1242 rrd_free(&rrd);
1243 free(pdp_temp);
1244 free(pdp_new);
1245 fclose(rrd_file);
1246 return(-1);
1247 }
1249 /* aargh ... that was tough ... so many loops ... anyway, its done.
1250 * we just need to write back the live header portion now*/
1252 if (fseek(rrd_file, (sizeof(stat_head_t)
1253 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1254 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1255 SEEK_SET) != 0) {
1256 rrd_set_error("seek rrd for live header writeback");
1257 free(updvals);
1258 free(tmpl_idx);
1259 rrd_free(&rrd);
1260 free(pdp_temp);
1261 free(pdp_new);
1262 fclose(rrd_file);
1263 return(-1);
1264 }
1266 if(version >= 3) {
1267 if(fwrite( rrd.live_head,
1268 sizeof(live_head_t), 1, rrd_file) != 1){
1269 rrd_set_error("fwrite live_head to rrd");
1270 free(updvals);
1271 rrd_free(&rrd);
1272 free(tmpl_idx);
1273 free(pdp_temp);
1274 free(pdp_new);
1275 fclose(rrd_file);
1276 return(-1);
1277 }
1278 }
1279 else {
1280 if(fwrite( &rrd.live_head->last_up,
1281 sizeof(time_t), 1, rrd_file) != 1){
1282 rrd_set_error("fwrite live_head to rrd");
1283 free(updvals);
1284 rrd_free(&rrd);
1285 free(tmpl_idx);
1286 free(pdp_temp);
1287 free(pdp_new);
1288 fclose(rrd_file);
1289 return(-1);
1290 }
1291 }
1294 if(fwrite( rrd.pdp_prep,
1295 sizeof(pdp_prep_t),
1296 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1297 rrd_set_error("ftwrite pdp_prep to rrd");
1298 free(updvals);
1299 rrd_free(&rrd);
1300 free(tmpl_idx);
1301 free(pdp_temp);
1302 free(pdp_new);
1303 fclose(rrd_file);
1304 return(-1);
1305 }
1307 if(fwrite( rrd.cdp_prep,
1308 sizeof(cdp_prep_t),
1309 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1310 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1312 rrd_set_error("ftwrite cdp_prep to rrd");
1313 free(updvals);
1314 free(tmpl_idx);
1315 rrd_free(&rrd);
1316 free(pdp_temp);
1317 free(pdp_new);
1318 fclose(rrd_file);
1319 return(-1);
1320 }
1322 if(fwrite( rrd.rra_ptr,
1323 sizeof(rra_ptr_t),
1324 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1325 rrd_set_error("fwrite rra_ptr to rrd");
1326 free(updvals);
1327 free(tmpl_idx);
1328 rrd_free(&rrd);
1329 free(pdp_temp);
1330 free(pdp_new);
1331 fclose(rrd_file);
1332 return(-1);
1333 }
1335 /* OK now close the files and free the memory */
1336 if(fclose(rrd_file) != 0){
1337 rrd_set_error("closing rrd");
1338 free(updvals);
1339 free(tmpl_idx);
1340 rrd_free(&rrd);
1341 free(pdp_temp);
1342 free(pdp_new);
1343 return(-1);
1344 }
1346 /* calling the smoothing code here guarantees at most
1347 * one smoothing operation per rrd_update call. Unfortunately,
1348 * it is possible with bulk updates, or a long-delayed update
1349 * for smoothing to occur off-schedule. This really isn't
1350 * critical except during the burning cycles. */
1351 if (schedule_smooth)
1352 {
1353 #ifndef WIN32
1354 rrd_file = fopen(filename,"r+");
1355 #else
1356 rrd_file = fopen(filename,"rb+");
1357 #endif
1358 rra_start = rra_begin;
1359 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1360 {
1361 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1362 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1363 {
1364 #ifdef DEBUG
1365 fprintf(stderr,"Running smoother for rra %ld\n",i);
1366 #endif
1367 apply_smoother(&rrd,i,rra_start,rrd_file);
1368 if (rrd_test_error())
1369 break;
1370 }
1371 rra_start += rrd.rra_def[i].row_cnt
1372 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1373 }
1374 fclose(rrd_file);
1375 }
1376 rrd_free(&rrd);
1377 free(updvals);
1378 free(tmpl_idx);
1379 free(pdp_new);
1380 free(pdp_temp);
1381 return(0);
1382 }
1384 /*
1385 * get exclusive lock to whole file.
1386 * lock gets removed when we close the file
1387 *
1388 * returns 0 on success
1389 */
1390 int
1391 LockRRD(FILE *rrdfile)
1392 {
1393 int rrd_fd; /* File descriptor for RRD */
1394 int stat;
1396 rrd_fd = fileno(rrdfile);
1398 {
1399 #ifndef WIN32
1400 struct flock lock;
1401 lock.l_type = F_WRLCK; /* exclusive write lock */
1402 lock.l_len = 0; /* whole file */
1403 lock.l_start = 0; /* start of file */
1404 lock.l_whence = SEEK_SET; /* end of file */
1406 stat = fcntl(rrd_fd, F_SETLK, &lock);
1407 #else
1408 struct _stat st;
1410 if ( _fstat( rrd_fd, &st ) == 0 ) {
1411 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1412 } else {
1413 stat = -1;
1414 }
1415 #endif
1416 }
1418 return(stat);
1419 }
1422 info_t
1423 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1424 unsigned short CDP_scratch_idx, FILE *rrd_file,
1425 info_t *pcdp_summary, time_t *rra_time)
1426 {
1427 unsigned long ds_idx, cdp_idx;
1428 infoval iv;
1430 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1431 {
1432 /* compute the cdp index */
1433 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1434 #ifdef DEBUG
1435 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1436 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1437 rrd -> rra_def[rra_idx].cf_nam);
1438 #endif
1439 if (pcdp_summary != NULL)
1440 {
1441 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1442 /* append info to the return hash */
1443 pcdp_summary = info_push(pcdp_summary,
1444 sprintf_alloc("[%d]RRA[%lu]DS[%s]",
1445 *rra_time, rra_idx, rrd->ds_def[ds_idx].ds_nam),
1446 RD_I_VAL, iv);
1447 }
1448 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1449 sizeof(rrd_value_t),1,rrd_file) != 1)
1450 {
1451 rrd_set_error("writing rrd");
1452 return;
1453 }
1454 *rra_current += sizeof(rrd_value_t);
1455 }
1456 return (pcdp_summary);
1457 }