1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2004
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.16 2004/05/25 20:52:16 oetiker
9 * fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
10 *
11 * Revision 1.15 2004/05/25 20:51:49 oetiker
12 * Update displayed copyright messages to be consistent. -- Mike Slifcak
13 *
14 * Revision 1.14 2003/11/11 19:46:21 oetiker
15 * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
16 *
17 * Revision 1.13 2003/11/11 19:38:03 oetiker
18 * rrd files should NOT change size ever ... bulk update code wa buggy.
19 * -- David M. Grimes <dgrimes@navisite.com>
20 *
21 * Revision 1.12 2003/09/04 13:16:12 oetiker
22 * should not assigne but compare ... grrrrr
23 *
24 * Revision 1.11 2003/09/02 21:58:35 oetiker
25 * be pickier about what we accept in rrd_update. Complain if things do not work out
26 *
27 * Revision 1.10 2003/04/29 19:14:12 jake
28 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
29 * Also revert accidental addition of -I to aclocal MakeMakefile.
30 *
31 * Revision 1.9 2003/04/25 18:35:08 jake
32 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
33 *
34 * Revision 1.8 2003/03/31 21:22:12 oetiker
35 * enables RRDtool updates with microsecond or in case of windows millisecond
36 * precision. This is needed to reduce time measurement error when archive step
37 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
38 *
39 * Revision 1.7 2003/02/13 07:05:27 oetiker
40 * Find attached the patch I promised to send to you. Please note that there
41 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
42 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
43 * library is identical to librrd, but it contains support code for per-thread
44 * global variables currently used for error information only. This is similar
45 * to how errno per-thread variables are implemented. librrd_th must be linked
46 * alongside of libpthred
47 *
48 * There is also a new file "THREADS", holding some documentation.
49 *
50 * -- Peter Stamfest <peter@stamfest.at>
51 *
52 * Revision 1.6 2002/02/01 20:34:49 oetiker
53 * fixed version number and date/time
54 *
55 * Revision 1.5 2001/05/09 05:31:01 oetiker
56 * Bug fix: when update of multiple PDP/CDP RRAs coincided
57 * with interpolation of multiple PDPs an incorrect value was
58 * stored as the CDP. Especially evident for GAUGE data sources.
59 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
60 *
61 * Revision 1.4 2001/03/10 23:54:41 oetiker
62 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
63 * parser and calculator from rrd_graph and puts then in a new file,
64 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
65 * clean-up of aberrant behavior stuff, including a bug fix.
66 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
67 * -- Jake Brutlag <jakeb@corp.webtv.net>
68 *
69 * Revision 1.3 2001/03/04 13:01:55 oetiker
70 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
71 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
72 * This is backwards compatible! But new files using the Aberrant stuff are not readable
73 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
74 * -- Jake Brutlag <jakeb@corp.webtv.net>
75 *
76 * Revision 1.2 2001/03/04 11:14:25 oetiker
77 * added at-style-time@value:value syntax to rrd_update
78 * -- Dave Bodenstab <imdave@mcs.net>
79 *
80 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
81 * checkin
82 *
83 *****************************************************************************/
85 #include "rrd_tool.h"
86 #include <sys/types.h>
87 #include <fcntl.h>
89 #ifdef WIN32
90 #include <sys/locking.h>
91 #include <sys/stat.h>
92 #include <io.h>
93 #endif
95 #include "rrd_hw.h"
96 #include "rrd_rpncalc.h"
98 #include "rrd_is_thread_safe.h"
100 #ifdef WIN32
101 /*
102 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
103 * replacement.
104 */
105 #include <sys/timeb.h>
107 struct timeval {
108 time_t tv_sec; /* seconds */
109 long tv_usec; /* microseconds */
110 };
112 struct __timezone {
113 int tz_minuteswest; /* minutes W of Greenwich */
114 int tz_dsttime; /* type of dst correction */
115 };
117 static gettimeofday(struct timeval *t, struct __timezone *tz) {
119 struct timeb current_time;
121 _ftime(¤t_time);
123 t->tv_sec = current_time.time;
124 t->tv_usec = current_time.millitm * 1000;
125 }
127 #endif
128 /*
129 * normilize time as returned by gettimeofday. usec part must
130 * be always >= 0
131 */
132 static void normalize_time(struct timeval *t)
133 {
134 if(t->tv_usec < 0) {
135 t->tv_sec--;
136 t->tv_usec += 1000000L;
137 }
138 }
140 /* Local prototypes */
141 int LockRRD(FILE *rrd_file);
142 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
143 unsigned long *rra_current,
144 unsigned short CDP_scratch_idx, FILE *rrd_file,
145 info_t *pcdp_summary, time_t *rra_time);
146 int rrd_update_r(char *filename, char *template, int argc, char **argv);
147 int _rrd_update(char *filename, char *template, int argc, char **argv,
148 info_t*);
150 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
153 #ifdef STANDALONE
154 int
155 main(int argc, char **argv){
156 rrd_update(argc,argv);
157 if (rrd_test_error()) {
158 printf("RRDtool 1.1.x Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
159 "Usage: rrdupdate filename\n"
160 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
161 "\t\t\ttime|N:value[:value...]\n\n"
162 "\t\t\tat-time@value[:value...]\n\n"
163 "\t\t\t[ time:value[:value...] ..]\n\n");
165 printf("ERROR: %s\n",rrd_get_error());
166 rrd_clear_error();
167 return 1;
168 }
169 return 0;
170 }
171 #endif
173 info_t *rrd_update_v(int argc, char **argv)
174 {
175 char *template = NULL;
176 info_t *result = NULL;
177 infoval rc;
179 while (1) {
180 static struct option long_options[] =
181 {
182 {"template", required_argument, 0, 't'},
183 {0,0,0,0}
184 };
185 int option_index = 0;
186 int opt;
187 opt = getopt_long(argc, argv, "t:",
188 long_options, &option_index);
190 if (opt == EOF)
191 break;
193 switch(opt) {
194 case 't':
195 template = optarg;
196 break;
198 case '?':
199 rrd_set_error("unknown option '%s'",argv[optind-1]);
200 rc.u_int = -1;
201 goto end_tag;
202 }
203 }
205 /* need at least 2 arguments: filename, data. */
206 if (argc-optind < 2) {
207 rrd_set_error("Not enough arguments");
208 rc.u_int = -1;
209 goto end_tag;
210 }
211 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
212 rc.u_int = _rrd_update(argv[optind], template,
213 argc - optind - 1, argv + optind + 1, result);
214 result->value.u_int = rc.u_int;
215 end_tag:
216 return result;
217 }
219 int
220 rrd_update(int argc, char **argv)
221 {
222 char *template = NULL;
223 int rc;
225 while (1) {
226 static struct option long_options[] =
227 {
228 {"template", required_argument, 0, 't'},
229 {0,0,0,0}
230 };
231 int option_index = 0;
232 int opt;
233 opt = getopt_long(argc, argv, "t:",
234 long_options, &option_index);
236 if (opt == EOF)
237 break;
239 switch(opt) {
240 case 't':
241 template = optarg;
242 break;
244 case '?':
245 rrd_set_error("unknown option '%s'",argv[optind-1]);
246 return(-1);
247 }
248 }
250 /* need at least 2 arguments: filename, data. */
251 if (argc-optind < 2) {
252 rrd_set_error("Not enough arguments");
254 return -1;
255 }
257 rc = rrd_update_r(argv[optind], template,
258 argc - optind - 1, argv + optind + 1);
259 return rc;
260 }
262 int
263 rrd_update_r(char *filename, char *template, int argc, char **argv)
264 {
265 return _rrd_update(filename, template, argc, argv, NULL);
266 }
268 int
269 _rrd_update(char *filename, char *template, int argc, char **argv,
270 info_t *pcdp_summary)
271 {
273 int arg_i = 2;
274 short j;
275 unsigned long i,ii,iii=1;
277 unsigned long rra_begin; /* byte pointer to the rra
278 * area in the rrd file. this
279 * pointer never changes value */
280 unsigned long rra_start; /* byte pointer to the rra
281 * area in the rrd file. this
282 * pointer changes as each rrd is
283 * processed. */
284 unsigned long rra_current; /* byte pointer to the current write
285 * spot in the rrd file. */
286 unsigned long rra_pos_tmp; /* temporary byte pointer. */
287 double interval,
288 pre_int,post_int; /* interval between this and
289 * the last run */
290 unsigned long proc_pdp_st; /* which pdp_st was the last
291 * to be processed */
292 unsigned long occu_pdp_st; /* when was the pdp_st
293 * before the last update
294 * time */
295 unsigned long proc_pdp_age; /* how old was the data in
296 * the pdp prep area when it
297 * was last updated */
298 unsigned long occu_pdp_age; /* how long ago was the last
299 * pdp_step time */
300 rrd_value_t *pdp_new; /* prepare the incoming data
301 * to be added the the
302 * existing entry */
303 rrd_value_t *pdp_temp; /* prepare the pdp values
304 * to be added the the
305 * cdp values */
307 long *tmpl_idx; /* index representing the settings
308 transported by the template index */
309 unsigned long tmpl_cnt = 2; /* time and data */
311 FILE *rrd_file;
312 rrd_t rrd;
313 time_t current_time;
314 time_t rra_time; /* time of update for a RRA */
315 unsigned long current_time_usec; /* microseconds part of current time */
316 struct timeval tmp_time; /* used for time conversion */
318 char **updvals;
319 int schedule_smooth = 0;
320 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
321 /* a vector of future Holt-Winters seasonal coefs */
322 unsigned long elapsed_pdp_st;
323 /* number of elapsed PDP steps since last update */
324 unsigned long *rra_step_cnt = NULL;
325 /* number of rows to be updated in an RRA for a data
326 * value. */
327 unsigned long start_pdp_offset;
328 /* number of PDP steps since the last update that
329 * are assigned to the first CDP to be generated
330 * since the last update. */
331 unsigned short scratch_idx;
332 /* index into the CDP scratch array */
333 enum cf_en current_cf;
334 /* numeric id of the current consolidation function */
335 rpnstack_t rpnstack; /* used for COMPUTE DS */
336 int version; /* rrd version */
337 char *endptr; /* used in the conversion */
339 rpnstack_init(&rpnstack);
341 /* need at least 1 arguments: data. */
342 if (argc < 1) {
343 rrd_set_error("Not enough arguments");
344 return -1;
345 }
349 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
350 return -1;
351 }
352 /* initialize time */
353 version = atoi(rrd.stat_head->version);
354 gettimeofday(&tmp_time, 0);
355 normalize_time(&tmp_time);
356 current_time = tmp_time.tv_sec;
357 if(version >= 3) {
358 current_time_usec = tmp_time.tv_usec;
359 }
360 else {
361 current_time_usec = 0;
362 }
364 rra_current = rra_start = rra_begin = ftell(rrd_file);
365 /* This is defined in the ANSI C standard, section 7.9.5.3:
367 When a file is opened with udpate mode ('+' as the second
368 or third character in the ... list of mode argument
369 variables), both input and ouptut may be performed on the
370 associated stream. However, ... input may not be directly
371 followed by output without an intervening call to a file
372 positioning function, unless the input oepration encounters
373 end-of-file. */
374 fseek(rrd_file, 0, SEEK_CUR);
377 /* get exclusive lock to whole file.
378 * lock gets removed when we close the file.
379 */
380 if (LockRRD(rrd_file) != 0) {
381 rrd_set_error("could not lock RRD");
382 rrd_free(&rrd);
383 fclose(rrd_file);
384 return(-1);
385 }
387 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
388 rrd_set_error("allocating updvals pointer array");
389 rrd_free(&rrd);
390 fclose(rrd_file);
391 return(-1);
392 }
394 if ((pdp_temp = malloc(sizeof(rrd_value_t)
395 *rrd.stat_head->ds_cnt))==NULL){
396 rrd_set_error("allocating pdp_temp ...");
397 free(updvals);
398 rrd_free(&rrd);
399 fclose(rrd_file);
400 return(-1);
401 }
403 if ((tmpl_idx = malloc(sizeof(unsigned long)
404 *(rrd.stat_head->ds_cnt+1)))==NULL){
405 rrd_set_error("allocating tmpl_idx ...");
406 free(pdp_temp);
407 free(updvals);
408 rrd_free(&rrd);
409 fclose(rrd_file);
410 return(-1);
411 }
412 /* initialize template redirector */
413 /* default config example (assume DS 1 is a CDEF DS)
414 tmpl_idx[0] -> 0; (time)
415 tmpl_idx[1] -> 1; (DS 0)
416 tmpl_idx[2] -> 3; (DS 2)
417 tmpl_idx[3] -> 4; (DS 3) */
418 tmpl_idx[0] = 0; /* time */
419 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
420 {
421 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
422 tmpl_idx[ii++]=i;
423 }
424 tmpl_cnt= ii;
426 if (template) {
427 char *dsname;
428 unsigned int tmpl_len;
429 dsname = template;
430 tmpl_cnt = 1; /* the first entry is the time */
431 tmpl_len = strlen(template);
432 for(i=0;i<=tmpl_len ;i++) {
433 if (template[i] == ':' || template[i] == '\0') {
434 template[i] = '\0';
435 if (tmpl_cnt>rrd.stat_head->ds_cnt){
436 rrd_set_error("Template contains more DS definitions than RRD");
437 free(updvals); free(pdp_temp);
438 free(tmpl_idx); rrd_free(&rrd);
439 fclose(rrd_file); return(-1);
440 }
441 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
442 rrd_set_error("unknown DS name '%s'",dsname);
443 free(updvals); free(pdp_temp);
444 free(tmpl_idx); rrd_free(&rrd);
445 fclose(rrd_file); return(-1);
446 } else {
447 /* the first element is always the time */
448 tmpl_idx[tmpl_cnt-1]++;
449 /* go to the next entry on the template */
450 dsname = &template[i+1];
451 /* fix the damage we did before */
452 if (i<tmpl_len) {
453 template[i]=':';
454 }
456 }
457 }
458 }
459 }
460 if ((pdp_new = malloc(sizeof(rrd_value_t)
461 *rrd.stat_head->ds_cnt))==NULL){
462 rrd_set_error("allocating pdp_new ...");
463 free(updvals);
464 free(pdp_temp);
465 free(tmpl_idx);
466 rrd_free(&rrd);
467 fclose(rrd_file);
468 return(-1);
469 }
471 /* loop through the arguments. */
472 for(arg_i=0; arg_i<argc;arg_i++) {
473 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
474 char *step_start = stepper;
475 char *p;
476 char *parsetime_error = NULL;
477 enum {atstyle, normal} timesyntax;
478 struct rrd_time_value ds_tv;
479 if (stepper == NULL){
480 rrd_set_error("failed duplication argv entry");
481 free(updvals);
482 free(pdp_temp);
483 free(tmpl_idx);
484 rrd_free(&rrd);
485 fclose(rrd_file);
486 return(-1);
487 }
488 /* initialize all ds input to unknown except the first one
489 which has always got to be set */
490 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
491 strcpy(stepper,argv[arg_i]);
492 updvals[0]=stepper;
493 /* separate all ds elements; first must be examined separately
494 due to alternate time syntax */
495 if ((p=strchr(stepper,'@'))!=NULL) {
496 timesyntax = atstyle;
497 *p = '\0';
498 stepper = p+1;
499 } else if ((p=strchr(stepper,':'))!=NULL) {
500 timesyntax = normal;
501 *p = '\0';
502 stepper = p+1;
503 } else {
504 rrd_set_error("expected timestamp not found in data source from %s:...",
505 argv[arg_i]);
506 free(step_start);
507 break;
508 }
509 ii=1;
510 updvals[tmpl_idx[ii]] = stepper;
511 while (*stepper) {
512 if (*stepper == ':') {
513 *stepper = '\0';
514 ii++;
515 if (ii<tmpl_cnt){
516 updvals[tmpl_idx[ii]] = stepper+1;
517 }
518 }
519 stepper++;
520 }
522 if (ii != tmpl_cnt-1) {
523 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
524 tmpl_cnt-1, ii, argv[arg_i]);
525 free(step_start);
526 break;
527 }
529 /* get the time from the reading ... handle N */
530 if (timesyntax == atstyle) {
531 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
532 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
533 free(step_start);
534 break;
535 }
536 if (ds_tv.type == RELATIVE_TO_END_TIME ||
537 ds_tv.type == RELATIVE_TO_START_TIME) {
538 rrd_set_error("specifying time relative to the 'start' "
539 "or 'end' makes no sense here: %s",
540 updvals[0]);
541 free(step_start);
542 break;
543 }
545 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
546 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
548 } else if (strcmp(updvals[0],"N")==0){
549 gettimeofday(&tmp_time, 0);
550 normalize_time(&tmp_time);
551 current_time = tmp_time.tv_sec;
552 current_time_usec = tmp_time.tv_usec;
553 } else {
554 double tmp;
555 tmp = strtod(updvals[0], 0);
556 current_time = floor(tmp);
557 current_time_usec = (long)((tmp - current_time) * 1000000L);
558 }
559 /* dont do any correction for old version RRDs */
560 if(version < 3)
561 current_time_usec = 0;
563 if(current_time <= rrd.live_head->last_up){
564 rrd_set_error("illegal attempt to update using time %ld when "
565 "last update time is %ld (minimum one second step)",
566 current_time, rrd.live_head->last_up);
567 free(step_start);
568 break;
569 }
572 /* seek to the beginning of the rra's */
573 if (rra_current != rra_begin) {
574 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
575 rrd_set_error("seek error in rrd");
576 free(step_start);
577 break;
578 }
579 rra_current = rra_begin;
580 }
581 rra_start = rra_begin;
583 /* when was the current pdp started */
584 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
585 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
587 /* when did the last pdp_st occur */
588 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
589 occu_pdp_st = current_time - occu_pdp_age;
590 /* interval = current_time - rrd.live_head->last_up; */
591 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
593 if (occu_pdp_st > proc_pdp_st){
594 /* OK we passed the pdp_st moment*/
595 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
596 * occurred before the latest
597 * pdp_st moment*/
598 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
599 post_int = occu_pdp_age; /* how much after it */
600 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
601 } else {
602 pre_int = interval;
603 post_int = 0;
604 }
606 #ifdef DEBUG
607 printf(
608 "proc_pdp_age %lu\t"
609 "proc_pdp_st %lu\t"
610 "occu_pfp_age %lu\t"
611 "occu_pdp_st %lu\t"
612 "int %lf\t"
613 "pre_int %lf\t"
614 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
615 occu_pdp_age, occu_pdp_st,
616 interval, pre_int, post_int);
617 #endif
619 /* process the data sources and update the pdp_prep
620 * area accordingly */
621 for(i=0;i<rrd.stat_head->ds_cnt;i++){
622 enum dst_en dst_idx;
623 dst_idx= dst_conv(rrd.ds_def[i].dst);
624 /* NOTE: DST_CDEF should never enter this if block, because
625 * updvals[i+1][0] is initialized to 'U'; unless the caller
626 * accidently specified a value for the DST_CDEF. To handle
627 * this case, an extra check is required. */
628 if((updvals[i+1][0] != 'U') &&
629 (dst_idx != DST_CDEF) &&
630 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
631 double rate = DNAN;
632 /* the data source type defines how to process the data */
633 /* pdp_new contains rate * time ... eg the bytes
634 * transferred during the interval. Doing it this way saves
635 * a lot of math operations */
638 switch(dst_idx){
639 case DST_COUNTER:
640 case DST_DERIVE:
641 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
642 for(ii=0;updvals[i+1][ii] != '\0';ii++){
643 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
644 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
645 break;
646 }
647 }
648 if (rrd_test_error()){
649 break;
650 }
651 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
652 if(dst_idx == DST_COUNTER) {
653 /* simple overflow catcher suggested by Andres Kroonmaa */
654 /* this will fail terribly for non 32 or 64 bit counters ... */
655 /* are there any others in SNMP land ? */
656 if (pdp_new[i] < (double)0.0 )
657 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
658 if (pdp_new[i] < (double)0.0 )
659 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
660 }
661 rate = pdp_new[i] / interval;
662 }
663 else {
664 pdp_new[i]= DNAN;
665 }
666 break;
667 case DST_ABSOLUTE:
668 errno = 0;
669 pdp_new[i] = strtod(updvals[i+1],&endptr);
670 if (errno > 0){
671 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
672 break;
673 };
674 if (endptr[0] != '\0'){
675 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
676 break;
677 }
678 rate = pdp_new[i] / interval;
679 break;
680 case DST_GAUGE:
681 errno = 0;
682 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
683 if (errno > 0){
684 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
685 break;
686 };
687 if (endptr[0] != '\0'){
688 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
689 break;
690 }
691 rate = pdp_new[i] / interval;
692 break;
693 default:
694 rrd_set_error("rrd contains unknown DS type : '%s'",
695 rrd.ds_def[i].dst);
696 break;
697 }
698 /* break out of this for loop if the error string is set */
699 if (rrd_test_error()){
700 break;
701 }
702 /* make sure pdp_temp is neither too large or too small
703 * if any of these occur it becomes unknown ...
704 * sorry folks ... */
705 if ( ! isnan(rate) &&
706 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
707 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
708 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
709 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
710 pdp_new[i] = DNAN;
711 }
712 } else {
713 /* no news is news all the same */
714 pdp_new[i] = DNAN;
715 }
717 /* make a copy of the command line argument for the next run */
718 #ifdef DEBUG
719 fprintf(stderr,
720 "prep ds[%lu]\t"
721 "last_arg '%s'\t"
722 "this_arg '%s'\t"
723 "pdp_new %10.2f\n",
724 i,
725 rrd.pdp_prep[i].last_ds,
726 updvals[i+1], pdp_new[i]);
727 #endif
728 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
729 strncpy(rrd.pdp_prep[i].last_ds,
730 updvals[i+1],LAST_DS_LEN-1);
731 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
732 }
733 }
734 /* break out of the argument parsing loop if the error_string is set */
735 if (rrd_test_error()){
736 free(step_start);
737 break;
738 }
739 /* has a pdp_st moment occurred since the last run ? */
741 if (proc_pdp_st == occu_pdp_st){
742 /* no we have not passed a pdp_st moment. therefore update is simple */
744 for(i=0;i<rrd.stat_head->ds_cnt;i++){
745 if(isnan(pdp_new[i]))
746 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
747 else
748 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
749 #ifdef DEBUG
750 fprintf(stderr,
751 "NO PDP ds[%lu]\t"
752 "value %10.2f\t"
753 "unkn_sec %5lu\n",
754 i,
755 rrd.pdp_prep[i].scratch[PDP_val].u_val,
756 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
757 #endif
758 }
759 } else {
760 /* an pdp_st has occurred. */
762 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
763 * occurred up to the last run.
764 pdp_new[] contains rate*seconds from the latest run.
765 pdp_temp[] will contain the rate for cdp */
767 for(i=0;i<rrd.stat_head->ds_cnt;i++){
768 /* update pdp_prep to the current pdp_st */
769 if(isnan(pdp_new[i]))
770 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
771 else
772 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
773 pdp_new[i]/(double)interval*(double)pre_int;
775 /* if too much of the pdp_prep is unknown we dump it */
776 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
777 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
778 (occu_pdp_st-proc_pdp_st <=
779 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
780 pdp_temp[i] = DNAN;
781 } else {
782 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
783 / (double)( occu_pdp_st
784 - proc_pdp_st
785 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
786 }
788 /* process CDEF data sources; remember each CDEF DS can
789 * only reference other DS with a lower index number */
790 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
791 rpnp_t *rpnp;
792 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
793 /* substitue data values for OP_VARIABLE nodes */
794 for (ii = 0; rpnp[ii].op != OP_END; ii++)
795 {
796 if (rpnp[ii].op == OP_VARIABLE) {
797 rpnp[ii].op = OP_NUMBER;
798 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
799 }
800 }
801 /* run the rpn calculator */
802 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
803 free(rpnp);
804 break; /* exits the data sources pdp_temp loop */
805 }
806 }
808 /* make pdp_prep ready for the next run */
809 if(isnan(pdp_new[i])){
810 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
811 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
812 } else {
813 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
814 rrd.pdp_prep[i].scratch[PDP_val].u_val =
815 pdp_new[i]/(double)interval*(double)post_int;
816 }
818 #ifdef DEBUG
819 fprintf(stderr,
820 "PDP UPD ds[%lu]\t"
821 "pdp_temp %10.2f\t"
822 "new_prep %10.2f\t"
823 "new_unkn_sec %5lu\n",
824 i, pdp_temp[i],
825 rrd.pdp_prep[i].scratch[PDP_val].u_val,
826 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
827 #endif
828 }
830 /* if there were errors during the last loop, bail out here */
831 if (rrd_test_error()){
832 free(step_start);
833 break;
834 }
836 /* compute the number of elapsed pdp_st moments */
837 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
838 #ifdef DEBUG
839 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
840 #endif
841 if (rra_step_cnt == NULL)
842 {
843 rra_step_cnt = (unsigned long *)
844 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
845 }
847 for(i = 0, rra_start = rra_begin;
848 i < rrd.stat_head->rra_cnt;
849 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
850 i++)
851 {
852 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
853 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
854 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
855 if (start_pdp_offset <= elapsed_pdp_st) {
856 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
857 rrd.rra_def[i].pdp_cnt + 1;
858 } else {
859 rra_step_cnt[i] = 0;
860 }
862 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
863 {
864 /* If this is a bulk update, we need to skip ahead in the seasonal
865 * arrays so that they will be correct for the next observed value;
866 * note that for the bulk update itself, no update will occur to
867 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
868 * be set to DNAN. */
869 if (rra_step_cnt[i] > 2)
870 {
871 /* skip update by resetting rra_step_cnt[i],
872 * note that this is not data source specific; this is due
873 * to the bulk update, not a DNAN value for the specific data
874 * source. */
875 rra_step_cnt[i] = 0;
876 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
877 &last_seasonal_coef);
878 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
879 &seasonal_coef);
880 }
882 /* periodically run a smoother for seasonal effects */
883 /* Need to use first cdp parameter buffer to track
884 * burnin (burnin requires a specific smoothing schedule).
885 * The CDP_init_seasonal parameter is really an RRA level,
886 * not a data source within RRA level parameter, but the rra_def
887 * is read only for rrd_update (not flushed to disk). */
888 iii = i*(rrd.stat_head -> ds_cnt);
889 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
890 <= BURNIN_CYCLES)
891 {
892 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
893 > rrd.rra_def[i].row_cnt - 1) {
894 /* mark off one of the burnin cycles */
895 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
896 schedule_smooth = 1;
897 }
898 } else {
899 /* someone has no doubt invented a trick to deal with this
900 * wrap around, but at least this code is clear. */
901 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
902 rrd.rra_ptr[i].cur_row)
903 {
904 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
905 * mapping between PDP and CDP */
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
908 {
909 #ifdef DEBUG
910 fprintf(stderr,
911 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
912 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
913 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
914 #endif
915 schedule_smooth = 1;
916 }
917 } else {
918 /* can't rely on negative numbers because we are working with
919 * unsigned values */
920 /* Don't need modulus here. If we've wrapped more than once, only
921 * one smooth is executed at the end. */
922 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
923 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
924 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
925 {
926 #ifdef DEBUG
927 fprintf(stderr,
928 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
929 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
930 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932 schedule_smooth = 1;
933 }
934 }
935 }
937 rra_current = ftell(rrd_file);
938 } /* if cf is DEVSEASONAL or SEASONAL */
940 if (rrd_test_error()) break;
942 /* update CDP_PREP areas */
943 /* loop over data soures within each RRA */
944 for(ii = 0;
945 ii < rrd.stat_head->ds_cnt;
946 ii++)
947 {
949 /* iii indexes the CDP prep area for this data source within the RRA */
950 iii=i*rrd.stat_head->ds_cnt+ii;
952 if (rrd.rra_def[i].pdp_cnt > 1) {
954 if (rra_step_cnt[i] > 0) {
955 /* If we are in this block, as least 1 CDP value will be written to
956 * disk, this is the CDP_primary_val entry. If more than 1 value needs
957 * to be written, then the "fill in" value is the CDP_secondary_val
958 * entry. */
959 if (isnan(pdp_temp[ii]))
960 {
961 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
962 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
963 } else {
964 /* CDP_secondary value is the RRA "fill in" value for intermediary
965 * CDP data entries. No matter the CF, the value is the same because
966 * the average, max, min, and last of a list of identical values is
967 * the same, namely, the value itself. */
968 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
969 }
971 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
972 > rrd.rra_def[i].pdp_cnt*
973 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
974 {
975 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
976 /* initialize carry over */
977 if (current_cf == CF_AVERAGE) {
978 if (isnan(pdp_temp[ii])) {
979 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
980 } else {
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
982 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
983 }
984 } else {
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
986 }
987 } else {
988 rrd_value_t cum_val, cur_val;
989 switch (current_cf) {
990 case CF_AVERAGE:
991 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
992 cur_val = IFDNAN(pdp_temp[ii],0.0);
993 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
994 (cum_val + cur_val * start_pdp_offset) /
995 (rrd.rra_def[i].pdp_cnt
996 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
997 /* initialize carry over value */
998 if (isnan(pdp_temp[ii])) {
999 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000 } else {
1001 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003 }
1004 break;
1005 case CF_MAXIMUM:
1006 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1007 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1008 #ifdef DEBUG
1009 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1010 isnan(pdp_temp[ii])) {
1011 fprintf(stderr,
1012 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1013 i,ii);
1014 exit(-1);
1015 }
1016 #endif
1017 if (cur_val > cum_val)
1018 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1019 else
1020 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1021 /* initialize carry over value */
1022 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1023 break;
1024 case CF_MINIMUM:
1025 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1026 cur_val = IFDNAN(pdp_temp[ii],DINF);
1027 #ifdef DEBUG
1028 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1029 isnan(pdp_temp[ii])) {
1030 fprintf(stderr,
1031 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1032 i,ii);
1033 exit(-1);
1034 }
1035 #endif
1036 if (cur_val < cum_val)
1037 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1038 else
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1040 /* initialize carry over value */
1041 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042 break;
1043 case CF_LAST:
1044 default:
1045 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1046 /* initialize carry over value */
1047 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048 break;
1049 }
1050 } /* endif meets xff value requirement for a valid value */
1051 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1052 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1053 if (isnan(pdp_temp[ii]))
1054 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1055 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1056 else
1057 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1058 } else /* rra_step_cnt[i] == 0 */
1059 {
1060 #ifdef DEBUG
1061 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1062 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1063 i,ii);
1064 } else {
1065 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1066 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1067 }
1068 #endif
1069 if (isnan(pdp_temp[ii])) {
1070 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1071 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1072 {
1073 if (current_cf == CF_AVERAGE) {
1074 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1075 elapsed_pdp_st;
1076 } else {
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078 }
1079 #ifdef DEBUG
1080 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1081 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1082 #endif
1083 } else {
1084 switch (current_cf) {
1085 case CF_AVERAGE:
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1087 elapsed_pdp_st;
1088 break;
1089 case CF_MINIMUM:
1090 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1091 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1092 break;
1093 case CF_MAXIMUM:
1094 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096 break;
1097 case CF_LAST:
1098 default:
1099 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100 break;
1101 }
1102 }
1103 }
1104 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1105 if (elapsed_pdp_st > 2)
1106 {
1107 switch (current_cf) {
1108 case CF_AVERAGE:
1109 default:
1110 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1111 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1112 break;
1113 case CF_SEASONAL:
1114 case CF_DEVSEASONAL:
1115 /* need to update cached seasonal values, so they are consistent
1116 * with the bulk update */
1117 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1118 * CDP_last_deviation are the same. */
1119 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1120 last_seasonal_coef[ii];
1121 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1122 seasonal_coef[ii];
1123 break;
1124 case CF_HWPREDICT:
1125 /* need to update the null_count and last_null_count.
1126 * even do this for non-DNAN pdp_temp because the
1127 * algorithm is not learning from batch updates. */
1128 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1129 elapsed_pdp_st;
1130 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1131 elapsed_pdp_st - 1;
1132 /* fall through */
1133 case CF_DEVPREDICT:
1134 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1135 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1136 break;
1137 case CF_FAILURES:
1138 /* do not count missed bulk values as failures */
1139 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1140 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1141 /* need to reset violations buffer.
1142 * could do this more carefully, but for now, just
1143 * assume a bulk update wipes away all violations. */
1144 erase_violations(&rrd, iii, i);
1145 break;
1146 }
1147 }
1148 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1150 if (rrd_test_error()) break;
1152 } /* endif data sources loop */
1153 } /* end RRA Loop */
1155 /* this loop is only entered if elapsed_pdp_st < 3 */
1156 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1157 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1158 {
1159 for(i = 0, rra_start = rra_begin;
1160 i < rrd.stat_head->rra_cnt;
1161 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1162 i++)
1163 {
1164 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1166 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1167 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1168 {
1169 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1170 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1171 &seasonal_coef);
1172 rra_current = ftell(rrd_file);
1173 }
1174 if (rrd_test_error()) break;
1175 /* loop over data soures within each RRA */
1176 for(ii = 0;
1177 ii < rrd.stat_head->ds_cnt;
1178 ii++)
1179 {
1180 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1181 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1182 scratch_idx, seasonal_coef);
1183 }
1184 } /* end RRA Loop */
1185 if (rrd_test_error()) break;
1186 } /* end elapsed_pdp_st loop */
1188 if (rrd_test_error()) break;
1190 /* Ready to write to disk */
1191 /* Move sequentially through the file, writing one RRA at a time.
1192 * Note this architecture divorces the computation of CDP with
1193 * flushing updated RRA entries to disk. */
1194 for(i = 0, rra_start = rra_begin;
1195 i < rrd.stat_head->rra_cnt;
1196 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1197 i++) {
1198 /* is there anything to write for this RRA? If not, continue. */
1199 if (rra_step_cnt[i] == 0) continue;
1201 /* write the first row */
1202 #ifdef DEBUG
1203 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1204 #endif
1205 rrd.rra_ptr[i].cur_row++;
1206 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1207 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1208 /* positition on the first row */
1209 rra_pos_tmp = rra_start +
1210 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1211 if(rra_pos_tmp != rra_current) {
1212 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1213 rrd_set_error("seek error in rrd");
1214 break;
1215 }
1216 rra_current = rra_pos_tmp;
1217 }
1219 #ifdef DEBUG
1220 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1221 #endif
1222 scratch_idx = CDP_primary_val;
1223 if (pcdp_summary != NULL)
1224 {
1225 rra_time = (current_time - current_time
1226 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1227 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1228 }
1229 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1230 pcdp_summary, &rra_time);
1231 if (rrd_test_error()) break;
1233 /* write other rows of the bulk update, if any */
1234 scratch_idx = CDP_secondary_val;
1235 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1236 {
1237 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1238 {
1239 #ifdef DEBUG
1240 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1241 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1242 #endif
1243 /* wrap */
1244 rrd.rra_ptr[i].cur_row = 0;
1245 /* seek back to beginning of current rra */
1246 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1247 {
1248 rrd_set_error("seek error in rrd");
1249 break;
1250 }
1251 #ifdef DEBUG
1252 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1253 #endif
1254 rra_current = rra_start;
1255 }
1256 if (pcdp_summary != NULL)
1257 {
1258 rra_time = (current_time - current_time
1259 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1260 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1261 }
1262 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1263 pcdp_summary, &rra_time);
1264 }
1266 if (rrd_test_error())
1267 break;
1268 } /* RRA LOOP */
1270 /* break out of the argument parsing loop if error_string is set */
1271 if (rrd_test_error()){
1272 free(step_start);
1273 break;
1274 }
1276 } /* endif a pdp_st has occurred */
1277 rrd.live_head->last_up = current_time;
1278 rrd.live_head->last_up_usec = current_time_usec;
1279 free(step_start);
1280 } /* function argument loop */
1282 if (seasonal_coef != NULL) free(seasonal_coef);
1283 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1284 if (rra_step_cnt != NULL) free(rra_step_cnt);
1285 rpnstack_free(&rpnstack);
1287 /* if we got here and if there is an error and if the file has not been
1288 * written to, then close things up and return. */
1289 if (rrd_test_error()) {
1290 free(updvals);
1291 free(tmpl_idx);
1292 rrd_free(&rrd);
1293 free(pdp_temp);
1294 free(pdp_new);
1295 fclose(rrd_file);
1296 return(-1);
1297 }
1299 /* aargh ... that was tough ... so many loops ... anyway, its done.
1300 * we just need to write back the live header portion now*/
1302 if (fseek(rrd_file, (sizeof(stat_head_t)
1303 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1304 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1305 SEEK_SET) != 0) {
1306 rrd_set_error("seek rrd for live header writeback");
1307 free(updvals);
1308 free(tmpl_idx);
1309 rrd_free(&rrd);
1310 free(pdp_temp);
1311 free(pdp_new);
1312 fclose(rrd_file);
1313 return(-1);
1314 }
1316 if(version >= 3) {
1317 if(fwrite( rrd.live_head,
1318 sizeof(live_head_t), 1, rrd_file) != 1){
1319 rrd_set_error("fwrite live_head to rrd");
1320 free(updvals);
1321 rrd_free(&rrd);
1322 free(tmpl_idx);
1323 free(pdp_temp);
1324 free(pdp_new);
1325 fclose(rrd_file);
1326 return(-1);
1327 }
1328 }
1329 else {
1330 if(fwrite( &rrd.live_head->last_up,
1331 sizeof(time_t), 1, rrd_file) != 1){
1332 rrd_set_error("fwrite live_head to rrd");
1333 free(updvals);
1334 rrd_free(&rrd);
1335 free(tmpl_idx);
1336 free(pdp_temp);
1337 free(pdp_new);
1338 fclose(rrd_file);
1339 return(-1);
1340 }
1341 }
1344 if(fwrite( rrd.pdp_prep,
1345 sizeof(pdp_prep_t),
1346 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1347 rrd_set_error("ftwrite pdp_prep to rrd");
1348 free(updvals);
1349 rrd_free(&rrd);
1350 free(tmpl_idx);
1351 free(pdp_temp);
1352 free(pdp_new);
1353 fclose(rrd_file);
1354 return(-1);
1355 }
1357 if(fwrite( rrd.cdp_prep,
1358 sizeof(cdp_prep_t),
1359 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1360 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1362 rrd_set_error("ftwrite cdp_prep to rrd");
1363 free(updvals);
1364 free(tmpl_idx);
1365 rrd_free(&rrd);
1366 free(pdp_temp);
1367 free(pdp_new);
1368 fclose(rrd_file);
1369 return(-1);
1370 }
1372 if(fwrite( rrd.rra_ptr,
1373 sizeof(rra_ptr_t),
1374 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1375 rrd_set_error("fwrite rra_ptr to rrd");
1376 free(updvals);
1377 free(tmpl_idx);
1378 rrd_free(&rrd);
1379 free(pdp_temp);
1380 free(pdp_new);
1381 fclose(rrd_file);
1382 return(-1);
1383 }
1385 /* OK now close the files and free the memory */
1386 if(fclose(rrd_file) != 0){
1387 rrd_set_error("closing rrd");
1388 free(updvals);
1389 free(tmpl_idx);
1390 rrd_free(&rrd);
1391 free(pdp_temp);
1392 free(pdp_new);
1393 return(-1);
1394 }
1396 /* calling the smoothing code here guarantees at most
1397 * one smoothing operation per rrd_update call. Unfortunately,
1398 * it is possible with bulk updates, or a long-delayed update
1399 * for smoothing to occur off-schedule. This really isn't
1400 * critical except during the burning cycles. */
1401 if (schedule_smooth)
1402 {
1403 #ifndef WIN32
1404 rrd_file = fopen(filename,"r+");
1405 #else
1406 rrd_file = fopen(filename,"rb+");
1407 #endif
1408 rra_start = rra_begin;
1409 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1410 {
1411 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1412 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1413 {
1414 #ifdef DEBUG
1415 fprintf(stderr,"Running smoother for rra %ld\n",i);
1416 #endif
1417 apply_smoother(&rrd,i,rra_start,rrd_file);
1418 if (rrd_test_error())
1419 break;
1420 }
1421 rra_start += rrd.rra_def[i].row_cnt
1422 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1423 }
1424 fclose(rrd_file);
1425 }
1426 rrd_free(&rrd);
1427 free(updvals);
1428 free(tmpl_idx);
1429 free(pdp_new);
1430 free(pdp_temp);
1431 return(0);
1432 }
1434 /*
1435 * get exclusive lock to whole file.
1436 * lock gets removed when we close the file
1437 *
1438 * returns 0 on success
1439 */
1440 int
1441 LockRRD(FILE *rrdfile)
1442 {
1443 int rrd_fd; /* File descriptor for RRD */
1444 int stat;
1446 rrd_fd = fileno(rrdfile);
1448 {
1449 #ifndef WIN32
1450 struct flock lock;
1451 lock.l_type = F_WRLCK; /* exclusive write lock */
1452 lock.l_len = 0; /* whole file */
1453 lock.l_start = 0; /* start of file */
1454 lock.l_whence = SEEK_SET; /* end of file */
1456 stat = fcntl(rrd_fd, F_SETLK, &lock);
1457 #else
1458 struct _stat st;
1460 if ( _fstat( rrd_fd, &st ) == 0 ) {
1461 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1462 } else {
1463 stat = -1;
1464 }
1465 #endif
1466 }
1468 return(stat);
1469 }
1472 info_t
1473 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1474 unsigned short CDP_scratch_idx, FILE *rrd_file,
1475 info_t *pcdp_summary, time_t *rra_time)
1476 {
1477 unsigned long ds_idx, cdp_idx;
1478 infoval iv;
1480 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1481 {
1482 /* compute the cdp index */
1483 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1484 #ifdef DEBUG
1485 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1486 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1487 rrd -> rra_def[rra_idx].cf_nam);
1488 #endif
1489 if (pcdp_summary != NULL)
1490 {
1491 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1492 /* append info to the return hash */
1493 pcdp_summary = info_push(pcdp_summary,
1494 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1495 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1496 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1497 RD_I_VAL, iv);
1498 }
1499 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1500 sizeof(rrd_value_t),1,rrd_file) != 1)
1501 {
1502 rrd_set_error("writing rrd");
1503 return 0;
1504 }
1505 *rra_current += sizeof(rrd_value_t);
1506 }
1507 return (pcdp_summary);
1508 }