de708df7564ff9fab0f6c30f6e4686c0b71d3f2a
1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.14 2003/11/11 19:46:21 oetiker
9 * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
10 *
11 * Revision 1.13 2003/11/11 19:38:03 oetiker
12 * rrd files should NOT change size ever ... bulk update code wa buggy.
13 * -- David M. Grimes <dgrimes@navisite.com>
14 *
15 * Revision 1.12 2003/09/04 13:16:12 oetiker
16 * should not assigne but compare ... grrrrr
17 *
18 * Revision 1.11 2003/09/02 21:58:35 oetiker
19 * be pickier about what we accept in rrd_update. Complain if things do not work out
20 *
21 * Revision 1.10 2003/04/29 19:14:12 jake
22 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
23 * Also revert accidental addition of -I to aclocal MakeMakefile.
24 *
25 * Revision 1.9 2003/04/25 18:35:08 jake
26 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
27 *
28 * Revision 1.8 2003/03/31 21:22:12 oetiker
29 * enables RRDtool updates with microsecond or in case of windows millisecond
30 * precision. This is needed to reduce time measurement error when archive step
31 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
32 *
33 * Revision 1.7 2003/02/13 07:05:27 oetiker
34 * Find attached the patch I promised to send to you. Please note that there
35 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
36 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
37 * library is identical to librrd, but it contains support code for per-thread
38 * global variables currently used for error information only. This is similar
39 * to how errno per-thread variables are implemented. librrd_th must be linked
40 * alongside of libpthred
41 *
42 * There is also a new file "THREADS", holding some documentation.
43 *
44 * -- Peter Stamfest <peter@stamfest.at>
45 *
46 * Revision 1.6 2002/02/01 20:34:49 oetiker
47 * fixed version number and date/time
48 *
49 * Revision 1.5 2001/05/09 05:31:01 oetiker
50 * Bug fix: when update of multiple PDP/CDP RRAs coincided
51 * with interpolation of multiple PDPs an incorrect value was
52 * stored as the CDP. Especially evident for GAUGE data sources.
53 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
54 *
55 * Revision 1.4 2001/03/10 23:54:41 oetiker
56 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
57 * parser and calculator from rrd_graph and puts then in a new file,
58 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
59 * clean-up of aberrant behavior stuff, including a bug fix.
60 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
61 * -- Jake Brutlag <jakeb@corp.webtv.net>
62 *
63 * Revision 1.3 2001/03/04 13:01:55 oetiker
64 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
65 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
66 * This is backwards compatible! But new files using the Aberrant stuff are not readable
67 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
68 * -- Jake Brutlag <jakeb@corp.webtv.net>
69 *
70 * Revision 1.2 2001/03/04 11:14:25 oetiker
71 * added at-style-time@value:value syntax to rrd_update
72 * -- Dave Bodenstab <imdave@mcs.net>
73 *
74 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
75 * checkin
76 *
77 *****************************************************************************/
79 #include "rrd_tool.h"
80 #include <sys/types.h>
81 #include <fcntl.h>
83 #ifdef WIN32
84 #include <sys/locking.h>
85 #include <sys/stat.h>
86 #include <io.h>
87 #endif
89 #include "rrd_hw.h"
90 #include "rrd_rpncalc.h"
92 #include "rrd_is_thread_safe.h"
94 #ifdef WIN32
95 /*
96 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
97 * replacement.
98 */
99 #include <sys/timeb.h>
101 struct timeval {
102 time_t tv_sec; /* seconds */
103 long tv_usec; /* microseconds */
104 };
106 struct __timezone {
107 int tz_minuteswest; /* minutes W of Greenwich */
108 int tz_dsttime; /* type of dst correction */
109 };
111 static gettimeofday(struct timeval *t, struct __timezone *tz) {
113 struct timeb current_time;
115 _ftime(¤t_time);
117 t->tv_sec = current_time.time;
118 t->tv_usec = current_time.millitm * 1000;
119 }
121 #endif
122 /*
123 * normilize time as returned by gettimeofday. usec part must
124 * be always >= 0
125 */
126 static void normalize_time(struct timeval *t)
127 {
128 if(t->tv_usec < 0) {
129 t->tv_sec--;
130 t->tv_usec += 1000000L;
131 }
132 }
134 /* Local prototypes */
135 int LockRRD(FILE *rrd_file);
136 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
137 unsigned long *rra_current,
138 unsigned short CDP_scratch_idx, FILE *rrd_file,
139 info_t *pcdp_summary, time_t *rra_time);
140 int rrd_update_r(char *filename, char *template, int argc, char **argv);
141 int _rrd_update(char *filename, char *template, int argc, char **argv,
142 info_t*);
144 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
147 #ifdef STANDALONE
148 int
149 main(int argc, char **argv){
150 rrd_update(argc,argv);
151 if (rrd_test_error()) {
152 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
153 "Usage: rrdupdate filename\n"
154 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
155 "\t\t\ttime|N:value[:value...]\n\n"
156 "\t\t\tat-time@value[:value...]\n\n"
157 "\t\t\t[ time:value[:value...] ..]\n\n");
159 printf("ERROR: %s\n",rrd_get_error());
160 rrd_clear_error();
161 return 1;
162 }
163 return 0;
164 }
165 #endif
167 info_t *rrd_update_v(int argc, char **argv)
168 {
169 char *template = NULL;
170 info_t *result = NULL;
171 infoval rc;
173 while (1) {
174 static struct option long_options[] =
175 {
176 {"template", required_argument, 0, 't'},
177 {0,0,0,0}
178 };
179 int option_index = 0;
180 int opt;
181 opt = getopt_long(argc, argv, "t:",
182 long_options, &option_index);
184 if (opt == EOF)
185 break;
187 switch(opt) {
188 case 't':
189 template = optarg;
190 break;
192 case '?':
193 rrd_set_error("unknown option '%s'",argv[optind-1]);
194 rc.u_int = -1;
195 goto end_tag;
196 }
197 }
199 /* need at least 2 arguments: filename, data. */
200 if (argc-optind < 2) {
201 rrd_set_error("Not enough arguments");
202 rc.u_int = -1;
203 goto end_tag;
204 }
205 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
206 rc.u_int = _rrd_update(argv[optind], template,
207 argc - optind - 1, argv + optind + 1, result);
208 result->value.u_int = rc.u_int;
209 end_tag:
210 return result;
211 }
213 int
214 rrd_update(int argc, char **argv)
215 {
216 char *template = NULL;
217 int rc;
219 while (1) {
220 static struct option long_options[] =
221 {
222 {"template", required_argument, 0, 't'},
223 {0,0,0,0}
224 };
225 int option_index = 0;
226 int opt;
227 opt = getopt_long(argc, argv, "t:",
228 long_options, &option_index);
230 if (opt == EOF)
231 break;
233 switch(opt) {
234 case 't':
235 template = optarg;
236 break;
238 case '?':
239 rrd_set_error("unknown option '%s'",argv[optind-1]);
240 return(-1);
241 }
242 }
244 /* need at least 2 arguments: filename, data. */
245 if (argc-optind < 2) {
246 rrd_set_error("Not enough arguments");
248 return -1;
249 }
251 rc = rrd_update_r(argv[optind], template,
252 argc - optind - 1, argv + optind + 1);
253 return rc;
254 }
256 int
257 rrd_update_r(char *filename, char *template, int argc, char **argv)
258 {
259 return _rrd_update(filename, template, argc, argv, NULL);
260 }
262 int
263 _rrd_update(char *filename, char *template, int argc, char **argv,
264 info_t *pcdp_summary)
265 {
267 int arg_i = 2;
268 short j;
269 unsigned long i,ii,iii=1;
271 unsigned long rra_begin; /* byte pointer to the rra
272 * area in the rrd file. this
273 * pointer never changes value */
274 unsigned long rra_start; /* byte pointer to the rra
275 * area in the rrd file. this
276 * pointer changes as each rrd is
277 * processed. */
278 unsigned long rra_current; /* byte pointer to the current write
279 * spot in the rrd file. */
280 unsigned long rra_pos_tmp; /* temporary byte pointer. */
281 double interval,
282 pre_int,post_int; /* interval between this and
283 * the last run */
284 unsigned long proc_pdp_st; /* which pdp_st was the last
285 * to be processed */
286 unsigned long occu_pdp_st; /* when was the pdp_st
287 * before the last update
288 * time */
289 unsigned long proc_pdp_age; /* how old was the data in
290 * the pdp prep area when it
291 * was last updated */
292 unsigned long occu_pdp_age; /* how long ago was the last
293 * pdp_step time */
294 rrd_value_t *pdp_new; /* prepare the incoming data
295 * to be added the the
296 * existing entry */
297 rrd_value_t *pdp_temp; /* prepare the pdp values
298 * to be added the the
299 * cdp values */
301 long *tmpl_idx; /* index representing the settings
302 transported by the template index */
303 unsigned long tmpl_cnt = 2; /* time and data */
305 FILE *rrd_file;
306 rrd_t rrd;
307 time_t current_time;
308 time_t rra_time; /* time of update for a RRA */
309 unsigned long current_time_usec; /* microseconds part of current time */
310 struct timeval tmp_time; /* used for time conversion */
312 char **updvals;
313 int schedule_smooth = 0;
314 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
315 /* a vector of future Holt-Winters seasonal coefs */
316 unsigned long elapsed_pdp_st;
317 /* number of elapsed PDP steps since last update */
318 unsigned long *rra_step_cnt = NULL;
319 /* number of rows to be updated in an RRA for a data
320 * value. */
321 unsigned long start_pdp_offset;
322 /* number of PDP steps since the last update that
323 * are assigned to the first CDP to be generated
324 * since the last update. */
325 unsigned short scratch_idx;
326 /* index into the CDP scratch array */
327 enum cf_en current_cf;
328 /* numeric id of the current consolidation function */
329 rpnstack_t rpnstack; /* used for COMPUTE DS */
330 int version; /* rrd version */
331 char *endptr; /* used in the conversion */
333 rpnstack_init(&rpnstack);
335 /* need at least 1 arguments: data. */
336 if (argc < 1) {
337 rrd_set_error("Not enough arguments");
338 return -1;
339 }
343 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
344 return -1;
345 }
346 /* initialize time */
347 version = atoi(rrd.stat_head->version);
348 gettimeofday(&tmp_time, 0);
349 normalize_time(&tmp_time);
350 current_time = tmp_time.tv_sec;
351 if(version >= 3) {
352 current_time_usec = tmp_time.tv_usec;
353 }
354 else {
355 current_time_usec = 0;
356 }
358 rra_current = rra_start = rra_begin = ftell(rrd_file);
359 /* This is defined in the ANSI C standard, section 7.9.5.3:
361 When a file is opened with udpate mode ('+' as the second
362 or third character in the ... list of mode argument
363 variables), both input and ouptut may be performed on the
364 associated stream. However, ... input may not be directly
365 followed by output without an intervening call to a file
366 positioning function, unless the input oepration encounters
367 end-of-file. */
368 fseek(rrd_file, 0, SEEK_CUR);
371 /* get exclusive lock to whole file.
372 * lock gets removed when we close the file.
373 */
374 if (LockRRD(rrd_file) != 0) {
375 rrd_set_error("could not lock RRD");
376 rrd_free(&rrd);
377 fclose(rrd_file);
378 return(-1);
379 }
381 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
382 rrd_set_error("allocating updvals pointer array");
383 rrd_free(&rrd);
384 fclose(rrd_file);
385 return(-1);
386 }
388 if ((pdp_temp = malloc(sizeof(rrd_value_t)
389 *rrd.stat_head->ds_cnt))==NULL){
390 rrd_set_error("allocating pdp_temp ...");
391 free(updvals);
392 rrd_free(&rrd);
393 fclose(rrd_file);
394 return(-1);
395 }
397 if ((tmpl_idx = malloc(sizeof(unsigned long)
398 *(rrd.stat_head->ds_cnt+1)))==NULL){
399 rrd_set_error("allocating tmpl_idx ...");
400 free(pdp_temp);
401 free(updvals);
402 rrd_free(&rrd);
403 fclose(rrd_file);
404 return(-1);
405 }
406 /* initialize template redirector */
407 /* default config example (assume DS 1 is a CDEF DS)
408 tmpl_idx[0] -> 0; (time)
409 tmpl_idx[1] -> 1; (DS 0)
410 tmpl_idx[2] -> 3; (DS 2)
411 tmpl_idx[3] -> 4; (DS 3) */
412 tmpl_idx[0] = 0; /* time */
413 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
414 {
415 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
416 tmpl_idx[ii++]=i;
417 }
418 tmpl_cnt= ii;
420 if (template) {
421 char *dsname;
422 unsigned int tmpl_len;
423 dsname = template;
424 tmpl_cnt = 1; /* the first entry is the time */
425 tmpl_len = strlen(template);
426 for(i=0;i<=tmpl_len ;i++) {
427 if (template[i] == ':' || template[i] == '\0') {
428 template[i] = '\0';
429 if (tmpl_cnt>rrd.stat_head->ds_cnt){
430 rrd_set_error("Template contains more DS definitions than RRD");
431 free(updvals); free(pdp_temp);
432 free(tmpl_idx); rrd_free(&rrd);
433 fclose(rrd_file); return(-1);
434 }
435 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
436 rrd_set_error("unknown DS name '%s'",dsname);
437 free(updvals); free(pdp_temp);
438 free(tmpl_idx); rrd_free(&rrd);
439 fclose(rrd_file); return(-1);
440 } else {
441 /* the first element is always the time */
442 tmpl_idx[tmpl_cnt-1]++;
443 /* go to the next entry on the template */
444 dsname = &template[i+1];
445 /* fix the damage we did before */
446 if (i<tmpl_len) {
447 template[i]=':';
448 }
450 }
451 }
452 }
453 }
454 if ((pdp_new = malloc(sizeof(rrd_value_t)
455 *rrd.stat_head->ds_cnt))==NULL){
456 rrd_set_error("allocating pdp_new ...");
457 free(updvals);
458 free(pdp_temp);
459 free(tmpl_idx);
460 rrd_free(&rrd);
461 fclose(rrd_file);
462 return(-1);
463 }
465 /* loop through the arguments. */
466 for(arg_i=0; arg_i<argc;arg_i++) {
467 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
468 char *step_start = stepper;
469 char *p;
470 char *parsetime_error = NULL;
471 enum {atstyle, normal} timesyntax;
472 struct rrd_time_value ds_tv;
473 if (stepper == NULL){
474 rrd_set_error("failed duplication argv entry");
475 free(updvals);
476 free(pdp_temp);
477 free(tmpl_idx);
478 rrd_free(&rrd);
479 fclose(rrd_file);
480 return(-1);
481 }
482 /* initialize all ds input to unknown except the first one
483 which has always got to be set */
484 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
485 strcpy(stepper,argv[arg_i]);
486 updvals[0]=stepper;
487 /* separate all ds elements; first must be examined separately
488 due to alternate time syntax */
489 if ((p=strchr(stepper,'@'))!=NULL) {
490 timesyntax = atstyle;
491 *p = '\0';
492 stepper = p+1;
493 } else if ((p=strchr(stepper,':'))!=NULL) {
494 timesyntax = normal;
495 *p = '\0';
496 stepper = p+1;
497 } else {
498 rrd_set_error("expected timestamp not found in data source from %s:...",
499 argv[arg_i]);
500 free(step_start);
501 break;
502 }
503 ii=1;
504 updvals[tmpl_idx[ii]] = stepper;
505 while (*stepper) {
506 if (*stepper == ':') {
507 *stepper = '\0';
508 ii++;
509 if (ii<tmpl_cnt){
510 updvals[tmpl_idx[ii]] = stepper+1;
511 }
512 }
513 stepper++;
514 }
516 if (ii != tmpl_cnt-1) {
517 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
518 tmpl_cnt-1, ii, argv[arg_i]);
519 free(step_start);
520 break;
521 }
523 /* get the time from the reading ... handle N */
524 if (timesyntax == atstyle) {
525 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
526 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
527 free(step_start);
528 break;
529 }
530 if (ds_tv.type == RELATIVE_TO_END_TIME ||
531 ds_tv.type == RELATIVE_TO_START_TIME) {
532 rrd_set_error("specifying time relative to the 'start' "
533 "or 'end' makes no sense here: %s",
534 updvals[0]);
535 free(step_start);
536 break;
537 }
539 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
540 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
542 } else if (strcmp(updvals[0],"N")==0){
543 gettimeofday(&tmp_time, 0);
544 normalize_time(&tmp_time);
545 current_time = tmp_time.tv_sec;
546 current_time_usec = tmp_time.tv_usec;
547 } else {
548 double tmp;
549 tmp = strtod(updvals[0], 0);
550 current_time = floor(tmp);
551 current_time_usec = (long)((tmp - current_time) * 1000000L);
552 }
553 /* dont do any correction for old version RRDs */
554 if(version < 3)
555 current_time_usec = 0;
557 if(current_time <= rrd.live_head->last_up){
558 rrd_set_error("illegal attempt to update using time %ld when "
559 "last update time is %ld (minimum one second step)",
560 current_time, rrd.live_head->last_up);
561 free(step_start);
562 break;
563 }
566 /* seek to the beginning of the rra's */
567 if (rra_current != rra_begin) {
568 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
569 rrd_set_error("seek error in rrd");
570 free(step_start);
571 break;
572 }
573 rra_current = rra_begin;
574 }
575 rra_start = rra_begin;
577 /* when was the current pdp started */
578 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
579 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
581 /* when did the last pdp_st occur */
582 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
583 occu_pdp_st = current_time - occu_pdp_age;
584 /* interval = current_time - rrd.live_head->last_up; */
585 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
587 if (occu_pdp_st > proc_pdp_st){
588 /* OK we passed the pdp_st moment*/
589 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
590 * occurred before the latest
591 * pdp_st moment*/
592 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
593 post_int = occu_pdp_age; /* how much after it */
594 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
595 } else {
596 pre_int = interval;
597 post_int = 0;
598 }
600 #ifdef DEBUG
601 printf(
602 "proc_pdp_age %lu\t"
603 "proc_pdp_st %lu\t"
604 "occu_pfp_age %lu\t"
605 "occu_pdp_st %lu\t"
606 "int %lf\t"
607 "pre_int %lf\t"
608 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
609 occu_pdp_age, occu_pdp_st,
610 interval, pre_int, post_int);
611 #endif
613 /* process the data sources and update the pdp_prep
614 * area accordingly */
615 for(i=0;i<rrd.stat_head->ds_cnt;i++){
616 enum dst_en dst_idx;
617 dst_idx= dst_conv(rrd.ds_def[i].dst);
618 /* NOTE: DST_CDEF should never enter this if block, because
619 * updvals[i+1][0] is initialized to 'U'; unless the caller
620 * accidently specified a value for the DST_CDEF. To handle
621 * this case, an extra check is required. */
622 if((updvals[i+1][0] != 'U') &&
623 (dst_idx != DST_CDEF) &&
624 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
625 double rate = DNAN;
626 /* the data source type defines how to process the data */
627 /* pdp_new contains rate * time ... eg the bytes
628 * transferred during the interval. Doing it this way saves
629 * a lot of math operations */
632 switch(dst_idx){
633 case DST_COUNTER:
634 case DST_DERIVE:
635 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636 for(ii=0;updvals[i+1][ii] != '\0';ii++){
637 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639 break;
640 }
641 }
642 if (rrd_test_error()){
643 break;
644 }
645 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646 if(dst_idx == DST_COUNTER) {
647 /* simple overflow catcher sugestet by andres kroonmaa */
648 /* this will fail terribly for non 32 or 64 bit counters ... */
649 /* are there any others in SNMP land ? */
650 if (pdp_new[i] < (double)0.0 )
651 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
652 if (pdp_new[i] < (double)0.0 )
653 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
654 }
655 rate = pdp_new[i] / interval;
656 }
657 else {
658 pdp_new[i]= DNAN;
659 }
660 break;
661 case DST_ABSOLUTE:
662 errno = 0;
663 pdp_new[i] = strtod(updvals[i+1],&endptr);
664 if (errno > 0){
665 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
666 break;
667 };
668 if (endptr[0] != '\0'){
669 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
670 break;
671 }
672 rate = pdp_new[i] / interval;
673 break;
674 case DST_GAUGE:
675 errno = 0;
676 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
677 if (errno > 0){
678 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
679 break;
680 };
681 if (endptr[0] != '\0'){
682 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
683 break;
684 }
685 rate = pdp_new[i] / interval;
686 break;
687 default:
688 rrd_set_error("rrd contains unknown DS type : '%s'",
689 rrd.ds_def[i].dst);
690 break;
691 }
692 /* break out of this for loop if the error string is set */
693 if (rrd_test_error()){
694 break;
695 }
696 /* make sure pdp_temp is neither too large or too small
697 * if any of these occur it becomes unknown ...
698 * sorry folks ... */
699 if ( ! isnan(rate) &&
700 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
702 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704 pdp_new[i] = DNAN;
705 }
706 } else {
707 /* no news is news all the same */
708 pdp_new[i] = DNAN;
709 }
711 /* make a copy of the command line argument for the next run */
712 #ifdef DEBUG
713 fprintf(stderr,
714 "prep ds[%lu]\t"
715 "last_arg '%s'\t"
716 "this_arg '%s'\t"
717 "pdp_new %10.2f\n",
718 i,
719 rrd.pdp_prep[i].last_ds,
720 updvals[i+1], pdp_new[i]);
721 #endif
722 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
723 strncpy(rrd.pdp_prep[i].last_ds,
724 updvals[i+1],LAST_DS_LEN-1);
725 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
726 }
727 }
728 /* break out of the argument parsing loop if the error_string is set */
729 if (rrd_test_error()){
730 free(step_start);
731 break;
732 }
733 /* has a pdp_st moment occurred since the last run ? */
735 if (proc_pdp_st == occu_pdp_st){
736 /* no we have not passed a pdp_st moment. therefore update is simple */
738 for(i=0;i<rrd.stat_head->ds_cnt;i++){
739 if(isnan(pdp_new[i]))
740 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
741 else
742 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
743 #ifdef DEBUG
744 fprintf(stderr,
745 "NO PDP ds[%lu]\t"
746 "value %10.2f\t"
747 "unkn_sec %5lu\n",
748 i,
749 rrd.pdp_prep[i].scratch[PDP_val].u_val,
750 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
751 #endif
752 }
753 } else {
754 /* an pdp_st has occurred. */
756 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
757 * occurred up to the last run.
758 pdp_new[] contains rate*seconds from the latest run.
759 pdp_temp[] will contain the rate for cdp */
761 for(i=0;i<rrd.stat_head->ds_cnt;i++){
762 /* update pdp_prep to the current pdp_st */
763 if(isnan(pdp_new[i]))
764 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
765 else
766 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
767 pdp_new[i]/(double)interval*(double)pre_int;
769 /* if too much of the pdp_prep is unknown we dump it */
770 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
771 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
772 (occu_pdp_st-proc_pdp_st <=
773 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
774 pdp_temp[i] = DNAN;
775 } else {
776 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
777 / (double)( occu_pdp_st
778 - proc_pdp_st
779 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
780 }
782 /* process CDEF data sources; remember each CDEF DS can
783 * only reference other DS with a lower index number */
784 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
785 rpnp_t *rpnp;
786 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
787 /* substitue data values for OP_VARIABLE nodes */
788 for (ii = 0; rpnp[ii].op != OP_END; ii++)
789 {
790 if (rpnp[ii].op == OP_VARIABLE) {
791 rpnp[ii].op = OP_NUMBER;
792 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
793 }
794 }
795 /* run the rpn calculator */
796 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
797 free(rpnp);
798 break; /* exits the data sources pdp_temp loop */
799 }
800 }
802 /* make pdp_prep ready for the next run */
803 if(isnan(pdp_new[i])){
804 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
805 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
806 } else {
807 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
808 rrd.pdp_prep[i].scratch[PDP_val].u_val =
809 pdp_new[i]/(double)interval*(double)post_int;
810 }
812 #ifdef DEBUG
813 fprintf(stderr,
814 "PDP UPD ds[%lu]\t"
815 "pdp_temp %10.2f\t"
816 "new_prep %10.2f\t"
817 "new_unkn_sec %5lu\n",
818 i, pdp_temp[i],
819 rrd.pdp_prep[i].scratch[PDP_val].u_val,
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
821 #endif
822 }
824 /* if there were errors during the last loop, bail out here */
825 if (rrd_test_error()){
826 free(step_start);
827 break;
828 }
830 /* compute the number of elapsed pdp_st moments */
831 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
832 #ifdef DEBUG
833 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
834 #endif
835 if (rra_step_cnt == NULL)
836 {
837 rra_step_cnt = (unsigned long *)
838 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
839 }
841 for(i = 0, rra_start = rra_begin;
842 i < rrd.stat_head->rra_cnt;
843 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
844 i++)
845 {
846 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
847 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
848 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
849 if (start_pdp_offset <= elapsed_pdp_st) {
850 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
851 rrd.rra_def[i].pdp_cnt + 1;
852 } else {
853 rra_step_cnt[i] = 0;
854 }
856 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
857 {
858 /* If this is a bulk update, we need to skip ahead in the seasonal
859 * arrays so that they will be correct for the next observed value;
860 * note that for the bulk update itself, no update will occur to
861 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
862 * be set to DNAN. */
863 if (rra_step_cnt[i] > 2)
864 {
865 /* skip update by resetting rra_step_cnt[i],
866 * note that this is not data source specific; this is due
867 * to the bulk update, not a DNAN value for the specific data
868 * source. */
869 rra_step_cnt[i] = 0;
870 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
871 &last_seasonal_coef);
872 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
873 &seasonal_coef);
874 }
876 /* periodically run a smoother for seasonal effects */
877 /* Need to use first cdp parameter buffer to track
878 * burnin (burnin requires a specific smoothing schedule).
879 * The CDP_init_seasonal parameter is really an RRA level,
880 * not a data source within RRA level parameter, but the rra_def
881 * is read only for rrd_update (not flushed to disk). */
882 iii = i*(rrd.stat_head -> ds_cnt);
883 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
884 <= BURNIN_CYCLES)
885 {
886 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
887 > rrd.rra_def[i].row_cnt - 1) {
888 /* mark off one of the burnin cycles */
889 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
890 schedule_smooth = 1;
891 }
892 } else {
893 /* someone has no doubt invented a trick to deal with this
894 * wrap around, but at least this code is clear. */
895 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
896 rrd.rra_ptr[i].cur_row)
897 {
898 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
899 * mapping between PDP and CDP */
900 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
901 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
902 {
903 #ifdef DEBUG
904 fprintf(stderr,
905 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
906 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
907 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
908 #endif
909 schedule_smooth = 1;
910 }
911 } else {
912 /* can't rely on negative numbers because we are working with
913 * unsigned values */
914 /* Don't need modulus here. If we've wrapped more than once, only
915 * one smooth is executed at the end. */
916 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
917 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
918 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
919 {
920 #ifdef DEBUG
921 fprintf(stderr,
922 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
923 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
924 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
925 #endif
926 schedule_smooth = 1;
927 }
928 }
929 }
931 rra_current = ftell(rrd_file);
932 } /* if cf is DEVSEASONAL or SEASONAL */
934 if (rrd_test_error()) break;
936 /* update CDP_PREP areas */
937 /* loop over data soures within each RRA */
938 for(ii = 0;
939 ii < rrd.stat_head->ds_cnt;
940 ii++)
941 {
943 /* iii indexes the CDP prep area for this data source within the RRA */
944 iii=i*rrd.stat_head->ds_cnt+ii;
946 if (rrd.rra_def[i].pdp_cnt > 1) {
948 if (rra_step_cnt[i] > 0) {
949 /* If we are in this block, as least 1 CDP value will be written to
950 * disk, this is the CDP_primary_val entry. If more than 1 value needs
951 * to be written, then the "fill in" value is the CDP_secondary_val
952 * entry. */
953 if (isnan(pdp_temp[ii]))
954 {
955 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
956 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
957 } else {
958 /* CDP_secondary value is the RRA "fill in" value for intermediary
959 * CDP data entries. No matter the CF, the value is the same because
960 * the average, max, min, and last of a list of identical values is
961 * the same, namely, the value itself. */
962 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
963 }
965 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
966 > rrd.rra_def[i].pdp_cnt*
967 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
968 {
969 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
970 /* initialize carry over */
971 if (current_cf == CF_AVERAGE) {
972 if (isnan(pdp_temp[ii])) {
973 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
974 } else {
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
976 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
977 }
978 } else {
979 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
980 }
981 } else {
982 rrd_value_t cum_val, cur_val;
983 switch (current_cf) {
984 case CF_AVERAGE:
985 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
986 cur_val = IFDNAN(pdp_temp[ii],0.0);
987 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
988 (cum_val + cur_val * start_pdp_offset) /
989 (rrd.rra_def[i].pdp_cnt
990 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
991 /* initialize carry over value */
992 if (isnan(pdp_temp[ii])) {
993 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
994 } else {
995 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
996 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
997 }
998 break;
999 case CF_MAXIMUM:
1000 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1001 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1002 #ifdef DEBUG
1003 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1004 isnan(pdp_temp[ii])) {
1005 fprintf(stderr,
1006 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1007 i,ii);
1008 exit(-1);
1009 }
1010 #endif
1011 if (cur_val > cum_val)
1012 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1013 else
1014 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1015 /* initialize carry over value */
1016 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1017 break;
1018 case CF_MINIMUM:
1019 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1020 cur_val = IFDNAN(pdp_temp[ii],DINF);
1021 #ifdef DEBUG
1022 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1023 isnan(pdp_temp[ii])) {
1024 fprintf(stderr,
1025 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1026 i,ii);
1027 exit(-1);
1028 }
1029 #endif
1030 if (cur_val < cum_val)
1031 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1032 else
1033 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1034 /* initialize carry over value */
1035 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1036 break;
1037 case CF_LAST:
1038 default:
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1040 /* initialize carry over value */
1041 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042 break;
1043 }
1044 } /* endif meets xff value requirement for a valid value */
1045 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1046 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1047 if (isnan(pdp_temp[ii]))
1048 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1049 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1050 else
1051 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1052 } else /* rra_step_cnt[i] == 0 */
1053 {
1054 #ifdef DEBUG
1055 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1056 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1057 i,ii);
1058 } else {
1059 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1060 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1061 }
1062 #endif
1063 if (isnan(pdp_temp[ii])) {
1064 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1065 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1066 {
1067 if (current_cf == CF_AVERAGE) {
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1069 elapsed_pdp_st;
1070 } else {
1071 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1072 }
1073 #ifdef DEBUG
1074 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1075 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1076 #endif
1077 } else {
1078 switch (current_cf) {
1079 case CF_AVERAGE:
1080 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1081 elapsed_pdp_st;
1082 break;
1083 case CF_MINIMUM:
1084 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1085 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1086 break;
1087 case CF_MAXIMUM:
1088 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1089 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090 break;
1091 case CF_LAST:
1092 default:
1093 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094 break;
1095 }
1096 }
1097 }
1098 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1099 if (elapsed_pdp_st > 2)
1100 {
1101 switch (current_cf) {
1102 case CF_AVERAGE:
1103 default:
1104 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1105 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1106 break;
1107 case CF_SEASONAL:
1108 case CF_DEVSEASONAL:
1109 /* need to update cached seasonal values, so they are consistent
1110 * with the bulk update */
1111 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1112 * CDP_last_deviation are the same. */
1113 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1114 last_seasonal_coef[ii];
1115 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1116 seasonal_coef[ii];
1117 break;
1118 case CF_HWPREDICT:
1119 /* need to update the null_count and last_null_count.
1120 * even do this for non-DNAN pdp_temp because the
1121 * algorithm is not learning from batch updates. */
1122 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1123 elapsed_pdp_st;
1124 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1125 elapsed_pdp_st - 1;
1126 /* fall through */
1127 case CF_DEVPREDICT:
1128 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1129 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1130 break;
1131 case CF_FAILURES:
1132 /* do not count missed bulk values as failures */
1133 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1134 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1135 /* need to reset violations buffer.
1136 * could do this more carefully, but for now, just
1137 * assume a bulk update wipes away all violations. */
1138 erase_violations(&rrd, iii, i);
1139 break;
1140 }
1141 }
1142 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1144 if (rrd_test_error()) break;
1146 } /* endif data sources loop */
1147 } /* end RRA Loop */
1149 /* this loop is only entered if elapsed_pdp_st < 3 */
1150 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1151 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1152 {
1153 for(i = 0, rra_start = rra_begin;
1154 i < rrd.stat_head->rra_cnt;
1155 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1156 i++)
1157 {
1158 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1160 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1161 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1162 {
1163 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1164 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1165 &seasonal_coef);
1166 rra_current = ftell(rrd_file);
1167 }
1168 if (rrd_test_error()) break;
1169 /* loop over data soures within each RRA */
1170 for(ii = 0;
1171 ii < rrd.stat_head->ds_cnt;
1172 ii++)
1173 {
1174 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1175 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1176 scratch_idx, seasonal_coef);
1177 }
1178 } /* end RRA Loop */
1179 if (rrd_test_error()) break;
1180 } /* end elapsed_pdp_st loop */
1182 if (rrd_test_error()) break;
1184 /* Ready to write to disk */
1185 /* Move sequentially through the file, writing one RRA at a time.
1186 * Note this architecture divorces the computation of CDP with
1187 * flushing updated RRA entries to disk. */
1188 for(i = 0, rra_start = rra_begin;
1189 i < rrd.stat_head->rra_cnt;
1190 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1191 i++) {
1192 /* is there anything to write for this RRA? If not, continue. */
1193 if (rra_step_cnt[i] == 0) continue;
1195 /* write the first row */
1196 #ifdef DEBUG
1197 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1198 #endif
1199 rrd.rra_ptr[i].cur_row++;
1200 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1201 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1202 /* positition on the first row */
1203 rra_pos_tmp = rra_start +
1204 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1205 if(rra_pos_tmp != rra_current) {
1206 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1207 rrd_set_error("seek error in rrd");
1208 break;
1209 }
1210 rra_current = rra_pos_tmp;
1211 }
1213 #ifdef DEBUG
1214 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1215 #endif
1216 scratch_idx = CDP_primary_val;
1217 if (pcdp_summary != NULL)
1218 {
1219 rra_time = (current_time - current_time
1220 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1221 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1222 }
1223 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1224 pcdp_summary, &rra_time);
1225 if (rrd_test_error()) break;
1227 /* write other rows of the bulk update, if any */
1228 scratch_idx = CDP_secondary_val;
1229 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1230 {
1231 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1232 {
1233 #ifdef DEBUG
1234 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1235 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1236 #endif
1237 /* wrap */
1238 rrd.rra_ptr[i].cur_row = 0;
1239 /* seek back to beginning of current rra */
1240 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1241 {
1242 rrd_set_error("seek error in rrd");
1243 break;
1244 }
1245 #ifdef DEBUG
1246 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1247 #endif
1248 rra_current = rra_start;
1249 }
1250 if (pcdp_summary != NULL)
1251 {
1252 rra_time = (current_time - current_time
1253 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1254 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1255 }
1256 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1257 pcdp_summary, &rra_time);
1258 }
1260 if (rrd_test_error())
1261 break;
1262 } /* RRA LOOP */
1264 /* break out of the argument parsing loop if error_string is set */
1265 if (rrd_test_error()){
1266 free(step_start);
1267 break;
1268 }
1270 } /* endif a pdp_st has occurred */
1271 rrd.live_head->last_up = current_time;
1272 rrd.live_head->last_up_usec = current_time_usec;
1273 free(step_start);
1274 } /* function argument loop */
1276 if (seasonal_coef != NULL) free(seasonal_coef);
1277 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1278 if (rra_step_cnt != NULL) free(rra_step_cnt);
1279 rpnstack_free(&rpnstack);
1281 /* if we got here and if there is an error and if the file has not been
1282 * written to, then close things up and return. */
1283 if (rrd_test_error()) {
1284 free(updvals);
1285 free(tmpl_idx);
1286 rrd_free(&rrd);
1287 free(pdp_temp);
1288 free(pdp_new);
1289 fclose(rrd_file);
1290 return(-1);
1291 }
1293 /* aargh ... that was tough ... so many loops ... anyway, its done.
1294 * we just need to write back the live header portion now*/
1296 if (fseek(rrd_file, (sizeof(stat_head_t)
1297 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1298 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1299 SEEK_SET) != 0) {
1300 rrd_set_error("seek rrd for live header writeback");
1301 free(updvals);
1302 free(tmpl_idx);
1303 rrd_free(&rrd);
1304 free(pdp_temp);
1305 free(pdp_new);
1306 fclose(rrd_file);
1307 return(-1);
1308 }
1310 if(version >= 3) {
1311 if(fwrite( rrd.live_head,
1312 sizeof(live_head_t), 1, rrd_file) != 1){
1313 rrd_set_error("fwrite live_head to rrd");
1314 free(updvals);
1315 rrd_free(&rrd);
1316 free(tmpl_idx);
1317 free(pdp_temp);
1318 free(pdp_new);
1319 fclose(rrd_file);
1320 return(-1);
1321 }
1322 }
1323 else {
1324 if(fwrite( &rrd.live_head->last_up,
1325 sizeof(time_t), 1, rrd_file) != 1){
1326 rrd_set_error("fwrite live_head to rrd");
1327 free(updvals);
1328 rrd_free(&rrd);
1329 free(tmpl_idx);
1330 free(pdp_temp);
1331 free(pdp_new);
1332 fclose(rrd_file);
1333 return(-1);
1334 }
1335 }
1338 if(fwrite( rrd.pdp_prep,
1339 sizeof(pdp_prep_t),
1340 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1341 rrd_set_error("ftwrite pdp_prep to rrd");
1342 free(updvals);
1343 rrd_free(&rrd);
1344 free(tmpl_idx);
1345 free(pdp_temp);
1346 free(pdp_new);
1347 fclose(rrd_file);
1348 return(-1);
1349 }
1351 if(fwrite( rrd.cdp_prep,
1352 sizeof(cdp_prep_t),
1353 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1354 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1356 rrd_set_error("ftwrite cdp_prep to rrd");
1357 free(updvals);
1358 free(tmpl_idx);
1359 rrd_free(&rrd);
1360 free(pdp_temp);
1361 free(pdp_new);
1362 fclose(rrd_file);
1363 return(-1);
1364 }
1366 if(fwrite( rrd.rra_ptr,
1367 sizeof(rra_ptr_t),
1368 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1369 rrd_set_error("fwrite rra_ptr to rrd");
1370 free(updvals);
1371 free(tmpl_idx);
1372 rrd_free(&rrd);
1373 free(pdp_temp);
1374 free(pdp_new);
1375 fclose(rrd_file);
1376 return(-1);
1377 }
1379 /* OK now close the files and free the memory */
1380 if(fclose(rrd_file) != 0){
1381 rrd_set_error("closing rrd");
1382 free(updvals);
1383 free(tmpl_idx);
1384 rrd_free(&rrd);
1385 free(pdp_temp);
1386 free(pdp_new);
1387 return(-1);
1388 }
1390 /* calling the smoothing code here guarantees at most
1391 * one smoothing operation per rrd_update call. Unfortunately,
1392 * it is possible with bulk updates, or a long-delayed update
1393 * for smoothing to occur off-schedule. This really isn't
1394 * critical except during the burning cycles. */
1395 if (schedule_smooth)
1396 {
1397 #ifndef WIN32
1398 rrd_file = fopen(filename,"r+");
1399 #else
1400 rrd_file = fopen(filename,"rb+");
1401 #endif
1402 rra_start = rra_begin;
1403 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1404 {
1405 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1406 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1407 {
1408 #ifdef DEBUG
1409 fprintf(stderr,"Running smoother for rra %ld\n",i);
1410 #endif
1411 apply_smoother(&rrd,i,rra_start,rrd_file);
1412 if (rrd_test_error())
1413 break;
1414 }
1415 rra_start += rrd.rra_def[i].row_cnt
1416 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1417 }
1418 fclose(rrd_file);
1419 }
1420 rrd_free(&rrd);
1421 free(updvals);
1422 free(tmpl_idx);
1423 free(pdp_new);
1424 free(pdp_temp);
1425 return(0);
1426 }
1428 /*
1429 * get exclusive lock to whole file.
1430 * lock gets removed when we close the file
1431 *
1432 * returns 0 on success
1433 */
1434 int
1435 LockRRD(FILE *rrdfile)
1436 {
1437 int rrd_fd; /* File descriptor for RRD */
1438 int stat;
1440 rrd_fd = fileno(rrdfile);
1442 {
1443 #ifndef WIN32
1444 struct flock lock;
1445 lock.l_type = F_WRLCK; /* exclusive write lock */
1446 lock.l_len = 0; /* whole file */
1447 lock.l_start = 0; /* start of file */
1448 lock.l_whence = SEEK_SET; /* end of file */
1450 stat = fcntl(rrd_fd, F_SETLK, &lock);
1451 #else
1452 struct _stat st;
1454 if ( _fstat( rrd_fd, &st ) == 0 ) {
1455 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1456 } else {
1457 stat = -1;
1458 }
1459 #endif
1460 }
1462 return(stat);
1463 }
1466 info_t
1467 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1468 unsigned short CDP_scratch_idx, FILE *rrd_file,
1469 info_t *pcdp_summary, time_t *rra_time)
1470 {
1471 unsigned long ds_idx, cdp_idx;
1472 infoval iv;
1474 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1475 {
1476 /* compute the cdp index */
1477 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1478 #ifdef DEBUG
1479 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1480 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1481 rrd -> rra_def[rra_idx].cf_nam);
1482 #endif
1483 if (pcdp_summary != NULL)
1484 {
1485 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1486 /* append info to the return hash */
1487 pcdp_summary = info_push(pcdp_summary,
1488 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1489 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1490 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1491 RD_I_VAL, iv);
1492 }
1493 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1494 sizeof(rrd_value_t),1,rrd_file) != 1)
1495 {
1496 rrd_set_error("writing rrd");
1497 return 0;
1498 }
1499 *rra_current += sizeof(rrd_value_t);
1500 }
1501 return (pcdp_summary);
1502 }