1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2004
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.17 2004/05/26 22:11:12 oetiker
9 * reduce compiler warnings. Many small fixes. -- Mike Slifcak <slif@bellsouth.net>
10 *
11 * Revision 1.16 2004/05/25 20:52:16 oetiker
12 * fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
13 *
14 * Revision 1.15 2004/05/25 20:51:49 oetiker
15 * Update displayed copyright messages to be consistent. -- Mike Slifcak
16 *
17 * Revision 1.14 2003/11/11 19:46:21 oetiker
18 * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
19 *
20 * Revision 1.13 2003/11/11 19:38:03 oetiker
21 * rrd files should NOT change size ever ... bulk update code wa buggy.
22 * -- David M. Grimes <dgrimes@navisite.com>
23 *
24 * Revision 1.12 2003/09/04 13:16:12 oetiker
25 * should not assigne but compare ... grrrrr
26 *
27 * Revision 1.11 2003/09/02 21:58:35 oetiker
28 * be pickier about what we accept in rrd_update. Complain if things do not work out
29 *
30 * Revision 1.10 2003/04/29 19:14:12 jake
31 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
32 * Also revert accidental addition of -I to aclocal MakeMakefile.
33 *
34 * Revision 1.9 2003/04/25 18:35:08 jake
35 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
36 *
37 * Revision 1.8 2003/03/31 21:22:12 oetiker
38 * enables RRDtool updates with microsecond or in case of windows millisecond
39 * precision. This is needed to reduce time measurement error when archive step
40 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
41 *
42 * Revision 1.7 2003/02/13 07:05:27 oetiker
43 * Find attached the patch I promised to send to you. Please note that there
44 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
45 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
46 * library is identical to librrd, but it contains support code for per-thread
47 * global variables currently used for error information only. This is similar
48 * to how errno per-thread variables are implemented. librrd_th must be linked
49 * alongside of libpthred
50 *
51 * There is also a new file "THREADS", holding some documentation.
52 *
53 * -- Peter Stamfest <peter@stamfest.at>
54 *
55 * Revision 1.6 2002/02/01 20:34:49 oetiker
56 * fixed version number and date/time
57 *
58 * Revision 1.5 2001/05/09 05:31:01 oetiker
59 * Bug fix: when update of multiple PDP/CDP RRAs coincided
60 * with interpolation of multiple PDPs an incorrect value was
61 * stored as the CDP. Especially evident for GAUGE data sources.
62 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
63 *
64 * Revision 1.4 2001/03/10 23:54:41 oetiker
65 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
66 * parser and calculator from rrd_graph and puts then in a new file,
67 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
68 * clean-up of aberrant behavior stuff, including a bug fix.
69 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
70 * -- Jake Brutlag <jakeb@corp.webtv.net>
71 *
72 * Revision 1.3 2001/03/04 13:01:55 oetiker
73 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
74 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
75 * This is backwards compatible! But new files using the Aberrant stuff are not readable
76 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
77 * -- Jake Brutlag <jakeb@corp.webtv.net>
78 *
79 * Revision 1.2 2001/03/04 11:14:25 oetiker
80 * added at-style-time@value:value syntax to rrd_update
81 * -- Dave Bodenstab <imdave@mcs.net>
82 *
83 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
84 * checkin
85 *
86 *****************************************************************************/
88 #include "rrd_tool.h"
89 #include <sys/types.h>
90 #include <fcntl.h>
92 #ifdef WIN32
93 #include <sys/locking.h>
94 #include <sys/stat.h>
95 #include <io.h>
96 #endif
98 #include "rrd_hw.h"
99 #include "rrd_rpncalc.h"
101 #include "rrd_is_thread_safe.h"
103 #ifdef WIN32
104 /*
105 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
106 * replacement.
107 */
108 #include <sys/timeb.h>
110 struct timeval {
111 time_t tv_sec; /* seconds */
112 long tv_usec; /* microseconds */
113 };
115 struct __timezone {
116 int tz_minuteswest; /* minutes W of Greenwich */
117 int tz_dsttime; /* type of dst correction */
118 };
120 static gettimeofday(struct timeval *t, struct __timezone *tz) {
122 struct timeb current_time;
124 _ftime(¤t_time);
126 t->tv_sec = current_time.time;
127 t->tv_usec = current_time.millitm * 1000;
128 }
130 #endif
131 /*
132 * normilize time as returned by gettimeofday. usec part must
133 * be always >= 0
134 */
135 static void normalize_time(struct timeval *t)
136 {
137 if(t->tv_usec < 0) {
138 t->tv_sec--;
139 t->tv_usec += 1000000L;
140 }
141 }
143 /* Local prototypes */
144 int LockRRD(FILE *rrd_file);
145 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
146 unsigned long *rra_current,
147 unsigned short CDP_scratch_idx, FILE *rrd_file,
148 info_t *pcdp_summary, time_t *rra_time);
149 int rrd_update_r(char *filename, char *template, int argc, char **argv);
150 int _rrd_update(char *filename, char *template, int argc, char **argv,
151 info_t*);
153 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
156 #ifdef STANDALONE
157 int
158 main(int argc, char **argv){
159 rrd_update(argc,argv);
160 if (rrd_test_error()) {
161 printf("RRDtool 1.1.x Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
162 "Usage: rrdupdate filename\n"
163 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
164 "\t\t\ttime|N:value[:value...]\n\n"
165 "\t\t\tat-time@value[:value...]\n\n"
166 "\t\t\t[ time:value[:value...] ..]\n\n");
168 printf("ERROR: %s\n",rrd_get_error());
169 rrd_clear_error();
170 return 1;
171 }
172 return 0;
173 }
174 #endif
176 info_t *rrd_update_v(int argc, char **argv)
177 {
178 char *template = NULL;
179 info_t *result = NULL;
180 infoval rc;
182 while (1) {
183 static struct option long_options[] =
184 {
185 {"template", required_argument, 0, 't'},
186 {0,0,0,0}
187 };
188 int option_index = 0;
189 int opt;
190 opt = getopt_long(argc, argv, "t:",
191 long_options, &option_index);
193 if (opt == EOF)
194 break;
196 switch(opt) {
197 case 't':
198 template = optarg;
199 break;
201 case '?':
202 rrd_set_error("unknown option '%s'",argv[optind-1]);
203 rc.u_int = -1;
204 goto end_tag;
205 }
206 }
208 /* need at least 2 arguments: filename, data. */
209 if (argc-optind < 2) {
210 rrd_set_error("Not enough arguments");
211 rc.u_int = -1;
212 goto end_tag;
213 }
214 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
215 rc.u_int = _rrd_update(argv[optind], template,
216 argc - optind - 1, argv + optind + 1, result);
217 result->value.u_int = rc.u_int;
218 end_tag:
219 return result;
220 }
222 int
223 rrd_update(int argc, char **argv)
224 {
225 char *template = NULL;
226 int rc;
228 while (1) {
229 static struct option long_options[] =
230 {
231 {"template", required_argument, 0, 't'},
232 {0,0,0,0}
233 };
234 int option_index = 0;
235 int opt;
236 opt = getopt_long(argc, argv, "t:",
237 long_options, &option_index);
239 if (opt == EOF)
240 break;
242 switch(opt) {
243 case 't':
244 template = optarg;
245 break;
247 case '?':
248 rrd_set_error("unknown option '%s'",argv[optind-1]);
249 return(-1);
250 }
251 }
253 /* need at least 2 arguments: filename, data. */
254 if (argc-optind < 2) {
255 rrd_set_error("Not enough arguments");
257 return -1;
258 }
260 rc = rrd_update_r(argv[optind], template,
261 argc - optind - 1, argv + optind + 1);
262 return rc;
263 }
265 int
266 rrd_update_r(char *filename, char *template, int argc, char **argv)
267 {
268 return _rrd_update(filename, template, argc, argv, NULL);
269 }
271 int
272 _rrd_update(char *filename, char *template, int argc, char **argv,
273 info_t *pcdp_summary)
274 {
276 int arg_i = 2;
277 short j;
278 unsigned long i,ii,iii=1;
280 unsigned long rra_begin; /* byte pointer to the rra
281 * area in the rrd file. this
282 * pointer never changes value */
283 unsigned long rra_start; /* byte pointer to the rra
284 * area in the rrd file. this
285 * pointer changes as each rrd is
286 * processed. */
287 unsigned long rra_current; /* byte pointer to the current write
288 * spot in the rrd file. */
289 unsigned long rra_pos_tmp; /* temporary byte pointer. */
290 double interval,
291 pre_int,post_int; /* interval between this and
292 * the last run */
293 unsigned long proc_pdp_st; /* which pdp_st was the last
294 * to be processed */
295 unsigned long occu_pdp_st; /* when was the pdp_st
296 * before the last update
297 * time */
298 unsigned long proc_pdp_age; /* how old was the data in
299 * the pdp prep area when it
300 * was last updated */
301 unsigned long occu_pdp_age; /* how long ago was the last
302 * pdp_step time */
303 rrd_value_t *pdp_new; /* prepare the incoming data
304 * to be added the the
305 * existing entry */
306 rrd_value_t *pdp_temp; /* prepare the pdp values
307 * to be added the the
308 * cdp values */
310 long *tmpl_idx; /* index representing the settings
311 transported by the template index */
312 unsigned long tmpl_cnt = 2; /* time and data */
314 FILE *rrd_file;
315 rrd_t rrd;
316 time_t current_time;
317 time_t rra_time; /* time of update for a RRA */
318 unsigned long current_time_usec; /* microseconds part of current time */
319 struct timeval tmp_time; /* used for time conversion */
321 char **updvals;
322 int schedule_smooth = 0;
323 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
324 /* a vector of future Holt-Winters seasonal coefs */
325 unsigned long elapsed_pdp_st;
326 /* number of elapsed PDP steps since last update */
327 unsigned long *rra_step_cnt = NULL;
328 /* number of rows to be updated in an RRA for a data
329 * value. */
330 unsigned long start_pdp_offset;
331 /* number of PDP steps since the last update that
332 * are assigned to the first CDP to be generated
333 * since the last update. */
334 unsigned short scratch_idx;
335 /* index into the CDP scratch array */
336 enum cf_en current_cf;
337 /* numeric id of the current consolidation function */
338 rpnstack_t rpnstack; /* used for COMPUTE DS */
339 int version; /* rrd version */
340 char *endptr; /* used in the conversion */
342 rpnstack_init(&rpnstack);
344 /* need at least 1 arguments: data. */
345 if (argc < 1) {
346 rrd_set_error("Not enough arguments");
347 return -1;
348 }
352 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
353 return -1;
354 }
355 /* initialize time */
356 version = atoi(rrd.stat_head->version);
357 gettimeofday(&tmp_time, 0);
358 normalize_time(&tmp_time);
359 current_time = tmp_time.tv_sec;
360 if(version >= 3) {
361 current_time_usec = tmp_time.tv_usec;
362 }
363 else {
364 current_time_usec = 0;
365 }
367 rra_current = rra_start = rra_begin = ftell(rrd_file);
368 /* This is defined in the ANSI C standard, section 7.9.5.3:
370 When a file is opened with udpate mode ('+' as the second
371 or third character in the ... list of mode argument
372 variables), both input and ouptut may be performed on the
373 associated stream. However, ... input may not be directly
374 followed by output without an intervening call to a file
375 positioning function, unless the input oepration encounters
376 end-of-file. */
377 fseek(rrd_file, 0, SEEK_CUR);
380 /* get exclusive lock to whole file.
381 * lock gets removed when we close the file.
382 */
383 if (LockRRD(rrd_file) != 0) {
384 rrd_set_error("could not lock RRD");
385 rrd_free(&rrd);
386 fclose(rrd_file);
387 return(-1);
388 }
390 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
391 rrd_set_error("allocating updvals pointer array");
392 rrd_free(&rrd);
393 fclose(rrd_file);
394 return(-1);
395 }
397 if ((pdp_temp = malloc(sizeof(rrd_value_t)
398 *rrd.stat_head->ds_cnt))==NULL){
399 rrd_set_error("allocating pdp_temp ...");
400 free(updvals);
401 rrd_free(&rrd);
402 fclose(rrd_file);
403 return(-1);
404 }
406 if ((tmpl_idx = malloc(sizeof(unsigned long)
407 *(rrd.stat_head->ds_cnt+1)))==NULL){
408 rrd_set_error("allocating tmpl_idx ...");
409 free(pdp_temp);
410 free(updvals);
411 rrd_free(&rrd);
412 fclose(rrd_file);
413 return(-1);
414 }
415 /* initialize template redirector */
416 /* default config example (assume DS 1 is a CDEF DS)
417 tmpl_idx[0] -> 0; (time)
418 tmpl_idx[1] -> 1; (DS 0)
419 tmpl_idx[2] -> 3; (DS 2)
420 tmpl_idx[3] -> 4; (DS 3) */
421 tmpl_idx[0] = 0; /* time */
422 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
423 {
424 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
425 tmpl_idx[ii++]=i;
426 }
427 tmpl_cnt= ii;
429 if (template) {
430 char *dsname;
431 unsigned int tmpl_len;
432 dsname = template;
433 tmpl_cnt = 1; /* the first entry is the time */
434 tmpl_len = strlen(template);
435 for(i=0;i<=tmpl_len ;i++) {
436 if (template[i] == ':' || template[i] == '\0') {
437 template[i] = '\0';
438 if (tmpl_cnt>rrd.stat_head->ds_cnt){
439 rrd_set_error("Template contains more DS definitions than RRD");
440 free(updvals); free(pdp_temp);
441 free(tmpl_idx); rrd_free(&rrd);
442 fclose(rrd_file); return(-1);
443 }
444 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
445 rrd_set_error("unknown DS name '%s'",dsname);
446 free(updvals); free(pdp_temp);
447 free(tmpl_idx); rrd_free(&rrd);
448 fclose(rrd_file); return(-1);
449 } else {
450 /* the first element is always the time */
451 tmpl_idx[tmpl_cnt-1]++;
452 /* go to the next entry on the template */
453 dsname = &template[i+1];
454 /* fix the damage we did before */
455 if (i<tmpl_len) {
456 template[i]=':';
457 }
459 }
460 }
461 }
462 }
463 if ((pdp_new = malloc(sizeof(rrd_value_t)
464 *rrd.stat_head->ds_cnt))==NULL){
465 rrd_set_error("allocating pdp_new ...");
466 free(updvals);
467 free(pdp_temp);
468 free(tmpl_idx);
469 rrd_free(&rrd);
470 fclose(rrd_file);
471 return(-1);
472 }
474 /* loop through the arguments. */
475 for(arg_i=0; arg_i<argc;arg_i++) {
476 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
477 char *step_start = stepper;
478 char *p;
479 char *parsetime_error = NULL;
480 enum {atstyle, normal} timesyntax;
481 struct rrd_time_value ds_tv;
482 if (stepper == NULL){
483 rrd_set_error("failed duplication argv entry");
484 free(updvals);
485 free(pdp_temp);
486 free(tmpl_idx);
487 rrd_free(&rrd);
488 fclose(rrd_file);
489 return(-1);
490 }
491 /* initialize all ds input to unknown except the first one
492 which has always got to be set */
493 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
494 strcpy(stepper,argv[arg_i]);
495 updvals[0]=stepper;
496 /* separate all ds elements; first must be examined separately
497 due to alternate time syntax */
498 if ((p=strchr(stepper,'@'))!=NULL) {
499 timesyntax = atstyle;
500 *p = '\0';
501 stepper = p+1;
502 } else if ((p=strchr(stepper,':'))!=NULL) {
503 timesyntax = normal;
504 *p = '\0';
505 stepper = p+1;
506 } else {
507 rrd_set_error("expected timestamp not found in data source from %s:...",
508 argv[arg_i]);
509 free(step_start);
510 break;
511 }
512 ii=1;
513 updvals[tmpl_idx[ii]] = stepper;
514 while (*stepper) {
515 if (*stepper == ':') {
516 *stepper = '\0';
517 ii++;
518 if (ii<tmpl_cnt){
519 updvals[tmpl_idx[ii]] = stepper+1;
520 }
521 }
522 stepper++;
523 }
525 if (ii != tmpl_cnt-1) {
526 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
527 tmpl_cnt-1, ii, argv[arg_i]);
528 free(step_start);
529 break;
530 }
532 /* get the time from the reading ... handle N */
533 if (timesyntax == atstyle) {
534 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
535 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
536 free(step_start);
537 break;
538 }
539 if (ds_tv.type == RELATIVE_TO_END_TIME ||
540 ds_tv.type == RELATIVE_TO_START_TIME) {
541 rrd_set_error("specifying time relative to the 'start' "
542 "or 'end' makes no sense here: %s",
543 updvals[0]);
544 free(step_start);
545 break;
546 }
548 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
549 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
551 } else if (strcmp(updvals[0],"N")==0){
552 gettimeofday(&tmp_time, 0);
553 normalize_time(&tmp_time);
554 current_time = tmp_time.tv_sec;
555 current_time_usec = tmp_time.tv_usec;
556 } else {
557 double tmp;
558 tmp = strtod(updvals[0], 0);
559 current_time = floor(tmp);
560 current_time_usec = (long)((tmp - current_time) * 1000000L);
561 }
562 /* dont do any correction for old version RRDs */
563 if(version < 3)
564 current_time_usec = 0;
566 if(current_time <= rrd.live_head->last_up){
567 rrd_set_error("illegal attempt to update using time %ld when "
568 "last update time is %ld (minimum one second step)",
569 current_time, rrd.live_head->last_up);
570 free(step_start);
571 break;
572 }
575 /* seek to the beginning of the rra's */
576 if (rra_current != rra_begin) {
577 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
578 rrd_set_error("seek error in rrd");
579 free(step_start);
580 break;
581 }
582 rra_current = rra_begin;
583 }
584 rra_start = rra_begin;
586 /* when was the current pdp started */
587 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
588 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
590 /* when did the last pdp_st occur */
591 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
592 occu_pdp_st = current_time - occu_pdp_age;
593 /* interval = current_time - rrd.live_head->last_up; */
594 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
596 if (occu_pdp_st > proc_pdp_st){
597 /* OK we passed the pdp_st moment*/
598 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
599 * occurred before the latest
600 * pdp_st moment*/
601 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
602 post_int = occu_pdp_age; /* how much after it */
603 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
604 } else {
605 pre_int = interval;
606 post_int = 0;
607 }
609 #ifdef DEBUG
610 printf(
611 "proc_pdp_age %lu\t"
612 "proc_pdp_st %lu\t"
613 "occu_pfp_age %lu\t"
614 "occu_pdp_st %lu\t"
615 "int %lf\t"
616 "pre_int %lf\t"
617 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
618 occu_pdp_age, occu_pdp_st,
619 interval, pre_int, post_int);
620 #endif
622 /* process the data sources and update the pdp_prep
623 * area accordingly */
624 for(i=0;i<rrd.stat_head->ds_cnt;i++){
625 enum dst_en dst_idx;
626 dst_idx= dst_conv(rrd.ds_def[i].dst);
627 /* NOTE: DST_CDEF should never enter this if block, because
628 * updvals[i+1][0] is initialized to 'U'; unless the caller
629 * accidently specified a value for the DST_CDEF. To handle
630 * this case, an extra check is required. */
631 if((updvals[i+1][0] != 'U') &&
632 (dst_idx != DST_CDEF) &&
633 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
634 double rate = DNAN;
635 /* the data source type defines how to process the data */
636 /* pdp_new contains rate * time ... eg the bytes
637 * transferred during the interval. Doing it this way saves
638 * a lot of math operations */
641 switch(dst_idx){
642 case DST_COUNTER:
643 case DST_DERIVE:
644 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
645 for(ii=0;updvals[i+1][ii] != '\0';ii++){
646 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
647 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
648 break;
649 }
650 }
651 if (rrd_test_error()){
652 break;
653 }
654 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
655 if(dst_idx == DST_COUNTER) {
656 /* simple overflow catcher suggested by Andres Kroonmaa */
657 /* this will fail terribly for non 32 or 64 bit counters ... */
658 /* are there any others in SNMP land ? */
659 if (pdp_new[i] < (double)0.0 )
660 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
661 if (pdp_new[i] < (double)0.0 )
662 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
663 }
664 rate = pdp_new[i] / interval;
665 }
666 else {
667 pdp_new[i]= DNAN;
668 }
669 break;
670 case DST_ABSOLUTE:
671 errno = 0;
672 pdp_new[i] = strtod(updvals[i+1],&endptr);
673 if (errno > 0){
674 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
675 break;
676 };
677 if (endptr[0] != '\0'){
678 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
679 break;
680 }
681 rate = pdp_new[i] / interval;
682 break;
683 case DST_GAUGE:
684 errno = 0;
685 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
686 if (errno > 0){
687 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
688 break;
689 };
690 if (endptr[0] != '\0'){
691 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
692 break;
693 }
694 rate = pdp_new[i] / interval;
695 break;
696 default:
697 rrd_set_error("rrd contains unknown DS type : '%s'",
698 rrd.ds_def[i].dst);
699 break;
700 }
701 /* break out of this for loop if the error string is set */
702 if (rrd_test_error()){
703 break;
704 }
705 /* make sure pdp_temp is neither too large or too small
706 * if any of these occur it becomes unknown ...
707 * sorry folks ... */
708 if ( ! isnan(rate) &&
709 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
710 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
711 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
712 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
713 pdp_new[i] = DNAN;
714 }
715 } else {
716 /* no news is news all the same */
717 pdp_new[i] = DNAN;
718 }
720 /* make a copy of the command line argument for the next run */
721 #ifdef DEBUG
722 fprintf(stderr,
723 "prep ds[%lu]\t"
724 "last_arg '%s'\t"
725 "this_arg '%s'\t"
726 "pdp_new %10.2f\n",
727 i,
728 rrd.pdp_prep[i].last_ds,
729 updvals[i+1], pdp_new[i]);
730 #endif
731 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
732 strncpy(rrd.pdp_prep[i].last_ds,
733 updvals[i+1],LAST_DS_LEN-1);
734 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
735 }
736 }
737 /* break out of the argument parsing loop if the error_string is set */
738 if (rrd_test_error()){
739 free(step_start);
740 break;
741 }
742 /* has a pdp_st moment occurred since the last run ? */
744 if (proc_pdp_st == occu_pdp_st){
745 /* no we have not passed a pdp_st moment. therefore update is simple */
747 for(i=0;i<rrd.stat_head->ds_cnt;i++){
748 if(isnan(pdp_new[i]))
749 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
750 else
751 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
752 #ifdef DEBUG
753 fprintf(stderr,
754 "NO PDP ds[%lu]\t"
755 "value %10.2f\t"
756 "unkn_sec %5lu\n",
757 i,
758 rrd.pdp_prep[i].scratch[PDP_val].u_val,
759 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
760 #endif
761 }
762 } else {
763 /* an pdp_st has occurred. */
765 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
766 * occurred up to the last run.
767 pdp_new[] contains rate*seconds from the latest run.
768 pdp_temp[] will contain the rate for cdp */
770 for(i=0;i<rrd.stat_head->ds_cnt;i++){
771 /* update pdp_prep to the current pdp_st */
772 if(isnan(pdp_new[i]))
773 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
774 else
775 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
776 pdp_new[i]/(double)interval*(double)pre_int;
778 /* if too much of the pdp_prep is unknown we dump it */
779 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
780 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
781 (occu_pdp_st-proc_pdp_st <=
782 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
783 pdp_temp[i] = DNAN;
784 } else {
785 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
786 / (double)( occu_pdp_st
787 - proc_pdp_st
788 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
789 }
791 /* process CDEF data sources; remember each CDEF DS can
792 * only reference other DS with a lower index number */
793 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
794 rpnp_t *rpnp;
795 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
796 /* substitue data values for OP_VARIABLE nodes */
797 for (ii = 0; rpnp[ii].op != OP_END; ii++)
798 {
799 if (rpnp[ii].op == OP_VARIABLE) {
800 rpnp[ii].op = OP_NUMBER;
801 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
802 }
803 }
804 /* run the rpn calculator */
805 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
806 free(rpnp);
807 break; /* exits the data sources pdp_temp loop */
808 }
809 }
811 /* make pdp_prep ready for the next run */
812 if(isnan(pdp_new[i])){
813 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
814 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
815 } else {
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
817 rrd.pdp_prep[i].scratch[PDP_val].u_val =
818 pdp_new[i]/(double)interval*(double)post_int;
819 }
821 #ifdef DEBUG
822 fprintf(stderr,
823 "PDP UPD ds[%lu]\t"
824 "pdp_temp %10.2f\t"
825 "new_prep %10.2f\t"
826 "new_unkn_sec %5lu\n",
827 i, pdp_temp[i],
828 rrd.pdp_prep[i].scratch[PDP_val].u_val,
829 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
830 #endif
831 }
833 /* if there were errors during the last loop, bail out here */
834 if (rrd_test_error()){
835 free(step_start);
836 break;
837 }
839 /* compute the number of elapsed pdp_st moments */
840 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
841 #ifdef DEBUG
842 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
843 #endif
844 if (rra_step_cnt == NULL)
845 {
846 rra_step_cnt = (unsigned long *)
847 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
848 }
850 for(i = 0, rra_start = rra_begin;
851 i < rrd.stat_head->rra_cnt;
852 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
853 i++)
854 {
855 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
856 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
857 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
858 if (start_pdp_offset <= elapsed_pdp_st) {
859 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
860 rrd.rra_def[i].pdp_cnt + 1;
861 } else {
862 rra_step_cnt[i] = 0;
863 }
865 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
866 {
867 /* If this is a bulk update, we need to skip ahead in the seasonal
868 * arrays so that they will be correct for the next observed value;
869 * note that for the bulk update itself, no update will occur to
870 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
871 * be set to DNAN. */
872 if (rra_step_cnt[i] > 2)
873 {
874 /* skip update by resetting rra_step_cnt[i],
875 * note that this is not data source specific; this is due
876 * to the bulk update, not a DNAN value for the specific data
877 * source. */
878 rra_step_cnt[i] = 0;
879 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
880 &last_seasonal_coef);
881 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
882 &seasonal_coef);
883 }
885 /* periodically run a smoother for seasonal effects */
886 /* Need to use first cdp parameter buffer to track
887 * burnin (burnin requires a specific smoothing schedule).
888 * The CDP_init_seasonal parameter is really an RRA level,
889 * not a data source within RRA level parameter, but the rra_def
890 * is read only for rrd_update (not flushed to disk). */
891 iii = i*(rrd.stat_head -> ds_cnt);
892 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
893 <= BURNIN_CYCLES)
894 {
895 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
896 > rrd.rra_def[i].row_cnt - 1) {
897 /* mark off one of the burnin cycles */
898 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
899 schedule_smooth = 1;
900 }
901 } else {
902 /* someone has no doubt invented a trick to deal with this
903 * wrap around, but at least this code is clear. */
904 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
905 rrd.rra_ptr[i].cur_row)
906 {
907 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
908 * mapping between PDP and CDP */
909 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
910 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
911 {
912 #ifdef DEBUG
913 fprintf(stderr,
914 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
915 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
916 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
917 #endif
918 schedule_smooth = 1;
919 }
920 } else {
921 /* can't rely on negative numbers because we are working with
922 * unsigned values */
923 /* Don't need modulus here. If we've wrapped more than once, only
924 * one smooth is executed at the end. */
925 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
926 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
927 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
928 {
929 #ifdef DEBUG
930 fprintf(stderr,
931 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
932 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
933 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
934 #endif
935 schedule_smooth = 1;
936 }
937 }
938 }
940 rra_current = ftell(rrd_file);
941 } /* if cf is DEVSEASONAL or SEASONAL */
943 if (rrd_test_error()) break;
945 /* update CDP_PREP areas */
946 /* loop over data soures within each RRA */
947 for(ii = 0;
948 ii < rrd.stat_head->ds_cnt;
949 ii++)
950 {
952 /* iii indexes the CDP prep area for this data source within the RRA */
953 iii=i*rrd.stat_head->ds_cnt+ii;
955 if (rrd.rra_def[i].pdp_cnt > 1) {
957 if (rra_step_cnt[i] > 0) {
958 /* If we are in this block, as least 1 CDP value will be written to
959 * disk, this is the CDP_primary_val entry. If more than 1 value needs
960 * to be written, then the "fill in" value is the CDP_secondary_val
961 * entry. */
962 if (isnan(pdp_temp[ii]))
963 {
964 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
965 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
966 } else {
967 /* CDP_secondary value is the RRA "fill in" value for intermediary
968 * CDP data entries. No matter the CF, the value is the same because
969 * the average, max, min, and last of a list of identical values is
970 * the same, namely, the value itself. */
971 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
972 }
974 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
975 > rrd.rra_def[i].pdp_cnt*
976 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
977 {
978 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
979 /* initialize carry over */
980 if (current_cf == CF_AVERAGE) {
981 if (isnan(pdp_temp[ii])) {
982 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
983 } else {
984 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
985 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
986 }
987 } else {
988 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
989 }
990 } else {
991 rrd_value_t cum_val, cur_val;
992 switch (current_cf) {
993 case CF_AVERAGE:
994 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
995 cur_val = IFDNAN(pdp_temp[ii],0.0);
996 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
997 (cum_val + cur_val * start_pdp_offset) /
998 (rrd.rra_def[i].pdp_cnt
999 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1000 /* initialize carry over value */
1001 if (isnan(pdp_temp[ii])) {
1002 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1003 } else {
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1005 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1006 }
1007 break;
1008 case CF_MAXIMUM:
1009 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1010 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1011 #ifdef DEBUG
1012 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1013 isnan(pdp_temp[ii])) {
1014 fprintf(stderr,
1015 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1016 i,ii);
1017 exit(-1);
1018 }
1019 #endif
1020 if (cur_val > cum_val)
1021 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1022 else
1023 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1024 /* initialize carry over value */
1025 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1026 break;
1027 case CF_MINIMUM:
1028 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1029 cur_val = IFDNAN(pdp_temp[ii],DINF);
1030 #ifdef DEBUG
1031 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1032 isnan(pdp_temp[ii])) {
1033 fprintf(stderr,
1034 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1035 i,ii);
1036 exit(-1);
1037 }
1038 #endif
1039 if (cur_val < cum_val)
1040 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1041 else
1042 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1043 /* initialize carry over value */
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045 break;
1046 case CF_LAST:
1047 default:
1048 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1049 /* initialize carry over value */
1050 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1051 break;
1052 }
1053 } /* endif meets xff value requirement for a valid value */
1054 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1055 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1056 if (isnan(pdp_temp[ii]))
1057 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1058 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1059 else
1060 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1061 } else /* rra_step_cnt[i] == 0 */
1062 {
1063 #ifdef DEBUG
1064 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1065 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1066 i,ii);
1067 } else {
1068 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1069 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1070 }
1071 #endif
1072 if (isnan(pdp_temp[ii])) {
1073 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1074 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1075 {
1076 if (current_cf == CF_AVERAGE) {
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1078 elapsed_pdp_st;
1079 } else {
1080 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1081 }
1082 #ifdef DEBUG
1083 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1084 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1085 #endif
1086 } else {
1087 switch (current_cf) {
1088 case CF_AVERAGE:
1089 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1090 elapsed_pdp_st;
1091 break;
1092 case CF_MINIMUM:
1093 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1094 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1095 break;
1096 case CF_MAXIMUM:
1097 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1098 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099 break;
1100 case CF_LAST:
1101 default:
1102 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1103 break;
1104 }
1105 }
1106 }
1107 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1108 if (elapsed_pdp_st > 2)
1109 {
1110 switch (current_cf) {
1111 case CF_AVERAGE:
1112 default:
1113 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1114 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1115 break;
1116 case CF_SEASONAL:
1117 case CF_DEVSEASONAL:
1118 /* need to update cached seasonal values, so they are consistent
1119 * with the bulk update */
1120 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1121 * CDP_last_deviation are the same. */
1122 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1123 last_seasonal_coef[ii];
1124 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1125 seasonal_coef[ii];
1126 break;
1127 case CF_HWPREDICT:
1128 /* need to update the null_count and last_null_count.
1129 * even do this for non-DNAN pdp_temp because the
1130 * algorithm is not learning from batch updates. */
1131 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1132 elapsed_pdp_st;
1133 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1134 elapsed_pdp_st - 1;
1135 /* fall through */
1136 case CF_DEVPREDICT:
1137 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1138 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1139 break;
1140 case CF_FAILURES:
1141 /* do not count missed bulk values as failures */
1142 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1143 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1144 /* need to reset violations buffer.
1145 * could do this more carefully, but for now, just
1146 * assume a bulk update wipes away all violations. */
1147 erase_violations(&rrd, iii, i);
1148 break;
1149 }
1150 }
1151 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1153 if (rrd_test_error()) break;
1155 } /* endif data sources loop */
1156 } /* end RRA Loop */
1158 /* this loop is only entered if elapsed_pdp_st < 3 */
1159 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1160 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1161 {
1162 for(i = 0, rra_start = rra_begin;
1163 i < rrd.stat_head->rra_cnt;
1164 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1165 i++)
1166 {
1167 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1169 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1170 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1171 {
1172 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1173 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1174 &seasonal_coef);
1175 rra_current = ftell(rrd_file);
1176 }
1177 if (rrd_test_error()) break;
1178 /* loop over data soures within each RRA */
1179 for(ii = 0;
1180 ii < rrd.stat_head->ds_cnt;
1181 ii++)
1182 {
1183 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1184 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1185 scratch_idx, seasonal_coef);
1186 }
1187 } /* end RRA Loop */
1188 if (rrd_test_error()) break;
1189 } /* end elapsed_pdp_st loop */
1191 if (rrd_test_error()) break;
1193 /* Ready to write to disk */
1194 /* Move sequentially through the file, writing one RRA at a time.
1195 * Note this architecture divorces the computation of CDP with
1196 * flushing updated RRA entries to disk. */
1197 for(i = 0, rra_start = rra_begin;
1198 i < rrd.stat_head->rra_cnt;
1199 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1200 i++) {
1201 /* is there anything to write for this RRA? If not, continue. */
1202 if (rra_step_cnt[i] == 0) continue;
1204 /* write the first row */
1205 #ifdef DEBUG
1206 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1207 #endif
1208 rrd.rra_ptr[i].cur_row++;
1209 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1210 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1211 /* positition on the first row */
1212 rra_pos_tmp = rra_start +
1213 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1214 if(rra_pos_tmp != rra_current) {
1215 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1216 rrd_set_error("seek error in rrd");
1217 break;
1218 }
1219 rra_current = rra_pos_tmp;
1220 }
1222 #ifdef DEBUG
1223 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1224 #endif
1225 scratch_idx = CDP_primary_val;
1226 if (pcdp_summary != NULL)
1227 {
1228 rra_time = (current_time - current_time
1229 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1230 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1231 }
1232 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1233 pcdp_summary, &rra_time);
1234 if (rrd_test_error()) break;
1236 /* write other rows of the bulk update, if any */
1237 scratch_idx = CDP_secondary_val;
1238 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1239 {
1240 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1241 {
1242 #ifdef DEBUG
1243 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1244 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1245 #endif
1246 /* wrap */
1247 rrd.rra_ptr[i].cur_row = 0;
1248 /* seek back to beginning of current rra */
1249 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1250 {
1251 rrd_set_error("seek error in rrd");
1252 break;
1253 }
1254 #ifdef DEBUG
1255 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1256 #endif
1257 rra_current = rra_start;
1258 }
1259 if (pcdp_summary != NULL)
1260 {
1261 rra_time = (current_time - current_time
1262 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1263 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1264 }
1265 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1266 pcdp_summary, &rra_time);
1267 }
1269 if (rrd_test_error())
1270 break;
1271 } /* RRA LOOP */
1273 /* break out of the argument parsing loop if error_string is set */
1274 if (rrd_test_error()){
1275 free(step_start);
1276 break;
1277 }
1279 } /* endif a pdp_st has occurred */
1280 rrd.live_head->last_up = current_time;
1281 rrd.live_head->last_up_usec = current_time_usec;
1282 free(step_start);
1283 } /* function argument loop */
1285 if (seasonal_coef != NULL) free(seasonal_coef);
1286 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1287 if (rra_step_cnt != NULL) free(rra_step_cnt);
1288 rpnstack_free(&rpnstack);
1290 /* if we got here and if there is an error and if the file has not been
1291 * written to, then close things up and return. */
1292 if (rrd_test_error()) {
1293 free(updvals);
1294 free(tmpl_idx);
1295 rrd_free(&rrd);
1296 free(pdp_temp);
1297 free(pdp_new);
1298 fclose(rrd_file);
1299 return(-1);
1300 }
1302 /* aargh ... that was tough ... so many loops ... anyway, its done.
1303 * we just need to write back the live header portion now*/
1305 if (fseek(rrd_file, (sizeof(stat_head_t)
1306 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1307 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1308 SEEK_SET) != 0) {
1309 rrd_set_error("seek rrd for live header writeback");
1310 free(updvals);
1311 free(tmpl_idx);
1312 rrd_free(&rrd);
1313 free(pdp_temp);
1314 free(pdp_new);
1315 fclose(rrd_file);
1316 return(-1);
1317 }
1319 if(version >= 3) {
1320 if(fwrite( rrd.live_head,
1321 sizeof(live_head_t), 1, rrd_file) != 1){
1322 rrd_set_error("fwrite live_head to rrd");
1323 free(updvals);
1324 rrd_free(&rrd);
1325 free(tmpl_idx);
1326 free(pdp_temp);
1327 free(pdp_new);
1328 fclose(rrd_file);
1329 return(-1);
1330 }
1331 }
1332 else {
1333 if(fwrite( &rrd.live_head->last_up,
1334 sizeof(time_t), 1, rrd_file) != 1){
1335 rrd_set_error("fwrite live_head to rrd");
1336 free(updvals);
1337 rrd_free(&rrd);
1338 free(tmpl_idx);
1339 free(pdp_temp);
1340 free(pdp_new);
1341 fclose(rrd_file);
1342 return(-1);
1343 }
1344 }
1347 if(fwrite( rrd.pdp_prep,
1348 sizeof(pdp_prep_t),
1349 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1350 rrd_set_error("ftwrite pdp_prep to rrd");
1351 free(updvals);
1352 rrd_free(&rrd);
1353 free(tmpl_idx);
1354 free(pdp_temp);
1355 free(pdp_new);
1356 fclose(rrd_file);
1357 return(-1);
1358 }
1360 if(fwrite( rrd.cdp_prep,
1361 sizeof(cdp_prep_t),
1362 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1363 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1365 rrd_set_error("ftwrite cdp_prep to rrd");
1366 free(updvals);
1367 free(tmpl_idx);
1368 rrd_free(&rrd);
1369 free(pdp_temp);
1370 free(pdp_new);
1371 fclose(rrd_file);
1372 return(-1);
1373 }
1375 if(fwrite( rrd.rra_ptr,
1376 sizeof(rra_ptr_t),
1377 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1378 rrd_set_error("fwrite rra_ptr to rrd");
1379 free(updvals);
1380 free(tmpl_idx);
1381 rrd_free(&rrd);
1382 free(pdp_temp);
1383 free(pdp_new);
1384 fclose(rrd_file);
1385 return(-1);
1386 }
1388 /* OK now close the files and free the memory */
1389 if(fclose(rrd_file) != 0){
1390 rrd_set_error("closing rrd");
1391 free(updvals);
1392 free(tmpl_idx);
1393 rrd_free(&rrd);
1394 free(pdp_temp);
1395 free(pdp_new);
1396 return(-1);
1397 }
1399 /* calling the smoothing code here guarantees at most
1400 * one smoothing operation per rrd_update call. Unfortunately,
1401 * it is possible with bulk updates, or a long-delayed update
1402 * for smoothing to occur off-schedule. This really isn't
1403 * critical except during the burning cycles. */
1404 if (schedule_smooth)
1405 {
1406 #ifndef WIN32
1407 rrd_file = fopen(filename,"r+");
1408 #else
1409 rrd_file = fopen(filename,"rb+");
1410 #endif
1411 rra_start = rra_begin;
1412 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1413 {
1414 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1415 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1416 {
1417 #ifdef DEBUG
1418 fprintf(stderr,"Running smoother for rra %ld\n",i);
1419 #endif
1420 apply_smoother(&rrd,i,rra_start,rrd_file);
1421 if (rrd_test_error())
1422 break;
1423 }
1424 rra_start += rrd.rra_def[i].row_cnt
1425 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1426 }
1427 fclose(rrd_file);
1428 }
1429 rrd_free(&rrd);
1430 free(updvals);
1431 free(tmpl_idx);
1432 free(pdp_new);
1433 free(pdp_temp);
1434 return(0);
1435 }
1437 /*
1438 * get exclusive lock to whole file.
1439 * lock gets removed when we close the file
1440 *
1441 * returns 0 on success
1442 */
1443 int
1444 LockRRD(FILE *rrdfile)
1445 {
1446 int rrd_fd; /* File descriptor for RRD */
1447 int rcstat;
1449 rrd_fd = fileno(rrdfile);
1451 {
1452 #ifndef WIN32
1453 struct flock lock;
1454 lock.l_type = F_WRLCK; /* exclusive write lock */
1455 lock.l_len = 0; /* whole file */
1456 lock.l_start = 0; /* start of file */
1457 lock.l_whence = SEEK_SET; /* end of file */
1459 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1460 #else
1461 struct _stat st;
1463 if ( _fstat( rrd_fd, &st ) == 0 ) {
1464 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1465 } else {
1466 rcstat = -1;
1467 }
1468 #endif
1469 }
1471 return(rcstat);
1472 }
1475 info_t
1476 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1477 unsigned short CDP_scratch_idx, FILE *rrd_file,
1478 info_t *pcdp_summary, time_t *rra_time)
1479 {
1480 unsigned long ds_idx, cdp_idx;
1481 infoval iv;
1483 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1484 {
1485 /* compute the cdp index */
1486 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1487 #ifdef DEBUG
1488 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1489 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1490 rrd -> rra_def[rra_idx].cf_nam);
1491 #endif
1492 if (pcdp_summary != NULL)
1493 {
1494 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1495 /* append info to the return hash */
1496 pcdp_summary = info_push(pcdp_summary,
1497 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1498 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1499 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1500 RD_I_VAL, iv);
1501 }
1502 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1503 sizeof(rrd_value_t),1,rrd_file) != 1)
1504 {
1505 rrd_set_error("writing rrd");
1506 return 0;
1507 }
1508 *rra_current += sizeof(rrd_value_t);
1509 }
1510 return (pcdp_summary);
1511 }