e9c75475b0ac2802a4964cac29df68d1442052ba
1 /*****************************************************************************
2 * RRDtool 1.1.x Copyright Tobias Oetiker, 1997 - 2002
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 * $Log$
8 * Revision 1.10 2003/04/29 19:14:12 jake
9 * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
10 * Also revert accidental addition of -I to aclocal MakeMakefile.
11 *
12 * Revision 1.9 2003/04/25 18:35:08 jake
13 * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
14 *
15 * Revision 1.8 2003/03/31 21:22:12 oetiker
16 * enables RRDtool updates with microsecond or in case of windows millisecond
17 * precision. This is needed to reduce time measurement error when archive step
18 * is small. (<30s) -- Sasha Mikheev <sasha@avalon-net.co.il>
19 *
20 * Revision 1.7 2003/02/13 07:05:27 oetiker
21 * Find attached the patch I promised to send to you. Please note that there
22 * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
23 * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
24 * library is identical to librrd, but it contains support code for per-thread
25 * global variables currently used for error information only. This is similar
26 * to how errno per-thread variables are implemented. librrd_th must be linked
27 * alongside of libpthred
28 *
29 * There is also a new file "THREADS", holding some documentation.
30 *
31 * -- Peter Stamfest <peter@stamfest.at>
32 *
33 * Revision 1.6 2002/02/01 20:34:49 oetiker
34 * fixed version number and date/time
35 *
36 * Revision 1.5 2001/05/09 05:31:01 oetiker
37 * Bug fix: when update of multiple PDP/CDP RRAs coincided
38 * with interpolation of multiple PDPs an incorrect value was
39 * stored as the CDP. Especially evident for GAUGE data sources.
40 * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
41 *
42 * Revision 1.4 2001/03/10 23:54:41 oetiker
43 * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
44 * parser and calculator from rrd_graph and puts then in a new file,
45 * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
46 * clean-up of aberrant behavior stuff, including a bug fix.
47 * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
48 * -- Jake Brutlag <jakeb@corp.webtv.net>
49 *
50 * Revision 1.3 2001/03/04 13:01:55 oetiker
51 * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
52 * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
53 * This is backwards compatible! But new files using the Aberrant stuff are not readable
54 * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
55 * -- Jake Brutlag <jakeb@corp.webtv.net>
56 *
57 * Revision 1.2 2001/03/04 11:14:25 oetiker
58 * added at-style-time@value:value syntax to rrd_update
59 * -- Dave Bodenstab <imdave@mcs.net>
60 *
61 * Revision 1.1.1.1 2001/02/25 22:25:06 oetiker
62 * checkin
63 *
64 *****************************************************************************/
66 #include "rrd_tool.h"
67 #include <sys/types.h>
68 #include <fcntl.h>
70 #ifdef WIN32
71 #include <sys/locking.h>
72 #include <sys/stat.h>
73 #include <io.h>
74 #endif
76 #include "rrd_hw.h"
77 #include "rrd_rpncalc.h"
79 #include "rrd_is_thread_safe.h"
81 #ifdef WIN32
82 /*
83 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
84 * replacement.
85 */
86 #include <sys/timeb.h>
88 struct timeval {
89 time_t tv_sec; /* seconds */
90 long tv_usec; /* microseconds */
91 };
93 struct __timezone {
94 int tz_minuteswest; /* minutes W of Greenwich */
95 int tz_dsttime; /* type of dst correction */
96 };
98 static gettimeofday(struct timeval *t, struct __timezone *tz) {
100 struct timeb current_time;
102 _ftime(¤t_time);
104 t->tv_sec = current_time.time;
105 t->tv_usec = current_time.millitm * 1000;
106 }
108 #endif
109 /*
110 * normilize time as returned by gettimeofday. usec part must
111 * be always >= 0
112 */
113 static void normalize_time(struct timeval *t)
114 {
115 if(t->tv_usec < 0) {
116 t->tv_sec--;
117 t->tv_usec += 1000000L;
118 }
119 }
121 /* Local prototypes */
122 int LockRRD(FILE *rrd_file);
123 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
124 unsigned long *rra_current,
125 unsigned short CDP_scratch_idx, FILE *rrd_file,
126 info_t *pcdp_summary, time_t *rra_time);
127 int rrd_update_r(char *filename, char *template, int argc, char **argv);
128 int _rrd_update(char *filename, char *template, int argc, char **argv,
129 info_t*);
131 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
134 #ifdef STANDALONE
135 int
136 main(int argc, char **argv){
137 rrd_update(argc,argv);
138 if (rrd_test_error()) {
139 printf("RRDtool 1.1.x Copyright 1997-2000 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
140 "Usage: rrdupdate filename\n"
141 "\t\t\t[--template|-t ds-name:ds-name:...]\n"
142 "\t\t\ttime|N:value[:value...]\n\n"
143 "\t\t\tat-time@value[:value...]\n\n"
144 "\t\t\t[ time:value[:value...] ..]\n\n");
146 printf("ERROR: %s\n",rrd_get_error());
147 rrd_clear_error();
148 return 1;
149 }
150 return 0;
151 }
152 #endif
154 info_t *rrd_update_v(int argc, char **argv)
155 {
156 char *template = NULL;
157 info_t *result = NULL;
158 infoval rc;
160 while (1) {
161 static struct option long_options[] =
162 {
163 {"template", required_argument, 0, 't'},
164 {0,0,0,0}
165 };
166 int option_index = 0;
167 int opt;
168 opt = getopt_long(argc, argv, "t:",
169 long_options, &option_index);
171 if (opt == EOF)
172 break;
174 switch(opt) {
175 case 't':
176 template = optarg;
177 break;
179 case '?':
180 rrd_set_error("unknown option '%s'",argv[optind-1]);
181 rc.u_int = -1;
182 goto end_tag;
183 }
184 }
186 /* need at least 2 arguments: filename, data. */
187 if (argc-optind < 2) {
188 rrd_set_error("Not enough arguments");
189 rc.u_int = -1;
190 goto end_tag;
191 }
192 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
193 rc.u_int = _rrd_update(argv[optind], template,
194 argc - optind - 1, argv + optind + 1, result);
195 result->value.u_int = rc.u_int;
196 end_tag:
197 return result;
198 }
200 int
201 rrd_update(int argc, char **argv)
202 {
203 char *template = NULL;
204 int rc;
206 while (1) {
207 static struct option long_options[] =
208 {
209 {"template", required_argument, 0, 't'},
210 {0,0,0,0}
211 };
212 int option_index = 0;
213 int opt;
214 opt = getopt_long(argc, argv, "t:",
215 long_options, &option_index);
217 if (opt == EOF)
218 break;
220 switch(opt) {
221 case 't':
222 template = optarg;
223 break;
225 case '?':
226 rrd_set_error("unknown option '%s'",argv[optind-1]);
227 return(-1);
228 }
229 }
231 /* need at least 2 arguments: filename, data. */
232 if (argc-optind < 2) {
233 rrd_set_error("Not enough arguments");
235 return -1;
236 }
238 rc = rrd_update_r(argv[optind], template,
239 argc - optind - 1, argv + optind + 1);
240 return rc;
241 }
243 int
244 rrd_update_r(char *filename, char *template, int argc, char **argv)
245 {
246 return _rrd_update(filename, template, argc, argv, NULL);
247 }
249 int
250 _rrd_update(char *filename, char *template, int argc, char **argv,
251 info_t *pcdp_summary)
252 {
254 int arg_i = 2;
255 short j;
256 unsigned long i,ii,iii=1;
258 unsigned long rra_begin; /* byte pointer to the rra
259 * area in the rrd file. this
260 * pointer never changes value */
261 unsigned long rra_start; /* byte pointer to the rra
262 * area in the rrd file. this
263 * pointer changes as each rrd is
264 * processed. */
265 unsigned long rra_current; /* byte pointer to the current write
266 * spot in the rrd file. */
267 unsigned long rra_pos_tmp; /* temporary byte pointer. */
268 double interval,
269 pre_int,post_int; /* interval between this and
270 * the last run */
271 unsigned long proc_pdp_st; /* which pdp_st was the last
272 * to be processed */
273 unsigned long occu_pdp_st; /* when was the pdp_st
274 * before the last update
275 * time */
276 unsigned long proc_pdp_age; /* how old was the data in
277 * the pdp prep area when it
278 * was last updated */
279 unsigned long occu_pdp_age; /* how long ago was the last
280 * pdp_step time */
281 rrd_value_t *pdp_new; /* prepare the incoming data
282 * to be added the the
283 * existing entry */
284 rrd_value_t *pdp_temp; /* prepare the pdp values
285 * to be added the the
286 * cdp values */
288 long *tmpl_idx; /* index representing the settings
289 transported by the template index */
290 unsigned long tmpl_cnt = 2; /* time and data */
292 FILE *rrd_file;
293 rrd_t rrd;
294 time_t current_time;
295 time_t rra_time; /* time of update for a RRA */
296 unsigned long current_time_usec; /* microseconds part of current time */
297 struct timeval tmp_time; /* used for time conversion */
299 char **updvals;
300 int schedule_smooth = 0;
301 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
302 /* a vector of future Holt-Winters seasonal coefs */
303 unsigned long elapsed_pdp_st;
304 /* number of elapsed PDP steps since last update */
305 unsigned long *rra_step_cnt = NULL;
306 /* number of rows to be updated in an RRA for a data
307 * value. */
308 unsigned long start_pdp_offset;
309 /* number of PDP steps since the last update that
310 * are assigned to the first CDP to be generated
311 * since the last update. */
312 unsigned short scratch_idx;
313 /* index into the CDP scratch array */
314 enum cf_en current_cf;
315 /* numeric id of the current consolidation function */
316 rpnstack_t rpnstack; /* used for COMPUTE DS */
317 int version; /* rrd version */
319 rpnstack_init(&rpnstack);
321 /* need at least 1 arguments: data. */
322 if (argc < 1) {
323 rrd_set_error("Not enough arguments");
324 return -1;
325 }
329 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
330 return -1;
331 }
332 /* initialize time */
333 version = atoi(rrd.stat_head->version);
334 gettimeofday(&tmp_time, 0);
335 normalize_time(&tmp_time);
336 current_time = tmp_time.tv_sec;
337 if(version >= 3) {
338 current_time_usec = tmp_time.tv_usec;
339 }
340 else {
341 current_time_usec = 0;
342 }
344 rra_current = rra_start = rra_begin = ftell(rrd_file);
345 /* This is defined in the ANSI C standard, section 7.9.5.3:
347 When a file is opened with udpate mode ('+' as the second
348 or third character in the ... list of mode argument
349 variables), both input and ouptut may be performed on the
350 associated stream. However, ... input may not be directly
351 followed by output without an intervening call to a file
352 positioning function, unless the input oepration encounters
353 end-of-file. */
354 fseek(rrd_file, 0, SEEK_CUR);
357 /* get exclusive lock to whole file.
358 * lock gets removed when we close the file.
359 */
360 if (LockRRD(rrd_file) != 0) {
361 rrd_set_error("could not lock RRD");
362 rrd_free(&rrd);
363 fclose(rrd_file);
364 return(-1);
365 }
367 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
368 rrd_set_error("allocating updvals pointer array");
369 rrd_free(&rrd);
370 fclose(rrd_file);
371 return(-1);
372 }
374 if ((pdp_temp = malloc(sizeof(rrd_value_t)
375 *rrd.stat_head->ds_cnt))==NULL){
376 rrd_set_error("allocating pdp_temp ...");
377 free(updvals);
378 rrd_free(&rrd);
379 fclose(rrd_file);
380 return(-1);
381 }
383 if ((tmpl_idx = malloc(sizeof(unsigned long)
384 *(rrd.stat_head->ds_cnt+1)))==NULL){
385 rrd_set_error("allocating tmpl_idx ...");
386 free(pdp_temp);
387 free(updvals);
388 rrd_free(&rrd);
389 fclose(rrd_file);
390 return(-1);
391 }
392 /* initialize template redirector */
393 /* default config example (assume DS 1 is a CDEF DS)
394 tmpl_idx[0] -> 0; (time)
395 tmpl_idx[1] -> 1; (DS 0)
396 tmpl_idx[2] -> 3; (DS 2)
397 tmpl_idx[3] -> 4; (DS 3) */
398 tmpl_idx[0] = 0; /* time */
399 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
400 {
401 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
402 tmpl_idx[ii++]=i;
403 }
404 tmpl_cnt= ii;
406 if (template) {
407 char *dsname;
408 unsigned int tmpl_len;
409 dsname = template;
410 tmpl_cnt = 1; /* the first entry is the time */
411 tmpl_len = strlen(template);
412 for(i=0;i<=tmpl_len ;i++) {
413 if (template[i] == ':' || template[i] == '\0') {
414 template[i] = '\0';
415 if (tmpl_cnt>rrd.stat_head->ds_cnt){
416 rrd_set_error("Template contains more DS definitions than RRD");
417 free(updvals); free(pdp_temp);
418 free(tmpl_idx); rrd_free(&rrd);
419 fclose(rrd_file); return(-1);
420 }
421 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
422 rrd_set_error("unknown DS name '%s'",dsname);
423 free(updvals); free(pdp_temp);
424 free(tmpl_idx); rrd_free(&rrd);
425 fclose(rrd_file); return(-1);
426 } else {
427 /* the first element is always the time */
428 tmpl_idx[tmpl_cnt-1]++;
429 /* go to the next entry on the template */
430 dsname = &template[i+1];
431 /* fix the damage we did before */
432 if (i<tmpl_len) {
433 template[i]=':';
434 }
436 }
437 }
438 }
439 }
440 if ((pdp_new = malloc(sizeof(rrd_value_t)
441 *rrd.stat_head->ds_cnt))==NULL){
442 rrd_set_error("allocating pdp_new ...");
443 free(updvals);
444 free(pdp_temp);
445 free(tmpl_idx);
446 rrd_free(&rrd);
447 fclose(rrd_file);
448 return(-1);
449 }
451 /* loop through the arguments. */
452 for(arg_i=0; arg_i<argc;arg_i++) {
453 char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
454 char *step_start = stepper;
455 char *p;
456 char *parsetime_error = NULL;
457 enum {atstyle, normal} timesyntax;
458 struct time_value ds_tv;
459 if (stepper == NULL){
460 rrd_set_error("failed duplication argv entry");
461 free(updvals);
462 free(pdp_temp);
463 free(tmpl_idx);
464 rrd_free(&rrd);
465 fclose(rrd_file);
466 return(-1);
467 }
468 /* initialize all ds input to unknown except the first one
469 which has always got to be set */
470 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471 strcpy(stepper,argv[arg_i]);
472 updvals[0]=stepper;
473 /* separate all ds elements; first must be examined separately
474 due to alternate time syntax */
475 if ((p=strchr(stepper,'@'))!=NULL) {
476 timesyntax = atstyle;
477 *p = '\0';
478 stepper = p+1;
479 } else if ((p=strchr(stepper,':'))!=NULL) {
480 timesyntax = normal;
481 *p = '\0';
482 stepper = p+1;
483 } else {
484 rrd_set_error("expected timestamp not found in data source from %s:...",
485 argv[arg_i]);
486 free(step_start);
487 break;
488 }
489 ii=1;
490 updvals[tmpl_idx[ii]] = stepper;
491 while (*stepper) {
492 if (*stepper == ':') {
493 *stepper = '\0';
494 ii++;
495 if (ii<tmpl_cnt){
496 updvals[tmpl_idx[ii]] = stepper+1;
497 }
498 }
499 stepper++;
500 }
502 if (ii != tmpl_cnt-1) {
503 rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504 tmpl_cnt-1, ii, argv[arg_i]);
505 free(step_start);
506 break;
507 }
509 /* get the time from the reading ... handle N */
510 if (timesyntax == atstyle) {
511 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
513 free(step_start);
514 break;
515 }
516 if (ds_tv.type == RELATIVE_TO_END_TIME ||
517 ds_tv.type == RELATIVE_TO_START_TIME) {
518 rrd_set_error("specifying time relative to the 'start' "
519 "or 'end' makes no sense here: %s",
520 updvals[0]);
521 free(step_start);
522 break;
523 }
525 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
528 } else if (strcmp(updvals[0],"N")==0){
529 gettimeofday(&tmp_time, 0);
530 normalize_time(&tmp_time);
531 current_time = tmp_time.tv_sec;
532 current_time_usec = tmp_time.tv_usec;
533 } else {
534 double tmp;
535 tmp = strtod(updvals[0], 0);
536 current_time = floor(tmp);
537 current_time_usec = (long)((tmp - current_time) * 1000000L);
538 }
539 /* dont do any correction for old version RRDs */
540 if(version < 3)
541 current_time_usec = 0;
543 if(current_time <= rrd.live_head->last_up){
544 rrd_set_error("illegal attempt to update using time %ld when "
545 "last update time is %ld (minimum one second step)",
546 current_time, rrd.live_head->last_up);
547 free(step_start);
548 break;
549 }
552 /* seek to the beginning of the rra's */
553 if (rra_current != rra_begin) {
554 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
555 rrd_set_error("seek error in rrd");
556 free(step_start);
557 break;
558 }
559 rra_current = rra_begin;
560 }
561 rra_start = rra_begin;
563 /* when was the current pdp started */
564 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
565 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
567 /* when did the last pdp_st occur */
568 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
569 occu_pdp_st = current_time - occu_pdp_age;
570 /* interval = current_time - rrd.live_head->last_up; */
571 interval = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
573 if (occu_pdp_st > proc_pdp_st){
574 /* OK we passed the pdp_st moment*/
575 pre_int = occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
576 * occurred before the latest
577 * pdp_st moment*/
578 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
579 post_int = occu_pdp_age; /* how much after it */
580 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
581 } else {
582 pre_int = interval;
583 post_int = 0;
584 }
586 #ifdef DEBUG
587 printf(
588 "proc_pdp_age %lu\t"
589 "proc_pdp_st %lu\t"
590 "occu_pfp_age %lu\t"
591 "occu_pdp_st %lu\t"
592 "int %lf\t"
593 "pre_int %lf\t"
594 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
595 occu_pdp_age, occu_pdp_st,
596 interval, pre_int, post_int);
597 #endif
599 /* process the data sources and update the pdp_prep
600 * area accordingly */
601 for(i=0;i<rrd.stat_head->ds_cnt;i++){
602 enum dst_en dst_idx;
603 dst_idx= dst_conv(rrd.ds_def[i].dst);
604 /* NOTE: DST_CDEF should never enter this if block, because
605 * updvals[i+1][0] is initialized to 'U'; unless the caller
606 * accidently specified a value for the DST_CDEF. To handle
607 * this case, an extra check is required. */
608 if((updvals[i+1][0] != 'U') &&
609 (dst_idx != DST_CDEF) &&
610 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
611 double rate = DNAN;
612 /* the data source type defines how to process the data */
613 /* pdp_new contains rate * time ... eg the bytes
614 * transferred during the interval. Doing it this way saves
615 * a lot of math operations */
618 switch(dst_idx){
619 case DST_COUNTER:
620 case DST_DERIVE:
621 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
622 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
623 if(dst_idx == DST_COUNTER) {
624 /* simple overflow catcher sugestet by andres kroonmaa */
625 /* this will fail terribly for non 32 or 64 bit counters ... */
626 /* are there any others in SNMP land ? */
627 if (pdp_new[i] < (double)0.0 )
628 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
629 if (pdp_new[i] < (double)0.0 )
630 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
631 }
632 rate = pdp_new[i] / interval;
633 }
634 else {
635 pdp_new[i]= DNAN;
636 }
637 break;
638 case DST_ABSOLUTE:
639 pdp_new[i]= atof(updvals[i+1]);
640 rate = pdp_new[i] / interval;
641 break;
642 case DST_GAUGE:
643 pdp_new[i] = atof(updvals[i+1]) * interval;
644 rate = pdp_new[i] / interval;
645 break;
646 default:
647 rrd_set_error("rrd contains unknown DS type : '%s'",
648 rrd.ds_def[i].dst);
649 break;
650 }
651 /* break out of this for loop if the error string is set */
652 if (rrd_test_error()){
653 break;
654 }
655 /* make sure pdp_temp is neither too large or too small
656 * if any of these occur it becomes unknown ...
657 * sorry folks ... */
658 if ( ! isnan(rate) &&
659 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
660 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
661 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
662 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
663 pdp_new[i] = DNAN;
664 }
665 } else {
666 /* no news is news all the same */
667 pdp_new[i] = DNAN;
668 }
670 /* make a copy of the command line argument for the next run */
671 #ifdef DEBUG
672 fprintf(stderr,
673 "prep ds[%lu]\t"
674 "last_arg '%s'\t"
675 "this_arg '%s'\t"
676 "pdp_new %10.2f\n",
677 i,
678 rrd.pdp_prep[i].last_ds,
679 updvals[i+1], pdp_new[i]);
680 #endif
681 if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
682 strncpy(rrd.pdp_prep[i].last_ds,
683 updvals[i+1],LAST_DS_LEN-1);
684 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
685 }
686 }
687 /* break out of the argument parsing loop if the error_string is set */
688 if (rrd_test_error()){
689 free(step_start);
690 break;
691 }
692 /* has a pdp_st moment occurred since the last run ? */
694 if (proc_pdp_st == occu_pdp_st){
695 /* no we have not passed a pdp_st moment. therefore update is simple */
697 for(i=0;i<rrd.stat_head->ds_cnt;i++){
698 if(isnan(pdp_new[i]))
699 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
700 else
701 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
702 #ifdef DEBUG
703 fprintf(stderr,
704 "NO PDP ds[%lu]\t"
705 "value %10.2f\t"
706 "unkn_sec %5lu\n",
707 i,
708 rrd.pdp_prep[i].scratch[PDP_val].u_val,
709 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
710 #endif
711 }
712 } else {
713 /* an pdp_st has occurred. */
715 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
716 * occurred up to the last run.
717 pdp_new[] contains rate*seconds from the latest run.
718 pdp_temp[] will contain the rate for cdp */
720 for(i=0;i<rrd.stat_head->ds_cnt;i++){
721 /* update pdp_prep to the current pdp_st */
722 if(isnan(pdp_new[i]))
723 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
724 else
725 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
726 pdp_new[i]/(double)interval*(double)pre_int;
728 /* if too much of the pdp_prep is unknown we dump it */
729 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
730 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
731 (occu_pdp_st-proc_pdp_st <=
732 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
733 pdp_temp[i] = DNAN;
734 } else {
735 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
736 / (double)( occu_pdp_st
737 - proc_pdp_st
738 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 }
741 /* process CDEF data sources; remember each CDEF DS can
742 * only reference other DS with a lower index number */
743 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
744 rpnp_t *rpnp;
745 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
746 /* substitue data values for OP_VARIABLE nodes */
747 for (ii = 0; rpnp[ii].op != OP_END; ii++)
748 {
749 if (rpnp[ii].op == OP_VARIABLE) {
750 rpnp[ii].op = OP_NUMBER;
751 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
752 }
753 }
754 /* run the rpn calculator */
755 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
756 free(rpnp);
757 break; /* exits the data sources pdp_temp loop */
758 }
759 }
761 /* make pdp_prep ready for the next run */
762 if(isnan(pdp_new[i])){
763 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
764 rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
765 } else {
766 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
767 rrd.pdp_prep[i].scratch[PDP_val].u_val =
768 pdp_new[i]/(double)interval*(double)post_int;
769 }
771 #ifdef DEBUG
772 fprintf(stderr,
773 "PDP UPD ds[%lu]\t"
774 "pdp_temp %10.2f\t"
775 "new_prep %10.2f\t"
776 "new_unkn_sec %5lu\n",
777 i, pdp_temp[i],
778 rrd.pdp_prep[i].scratch[PDP_val].u_val,
779 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
780 #endif
781 }
783 /* if there were errors during the last loop, bail out here */
784 if (rrd_test_error()){
785 free(step_start);
786 break;
787 }
789 /* compute the number of elapsed pdp_st moments */
790 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
791 #ifdef DEBUG
792 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
793 #endif
794 if (rra_step_cnt == NULL)
795 {
796 rra_step_cnt = (unsigned long *)
797 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
798 }
800 for(i = 0, rra_start = rra_begin;
801 i < rrd.stat_head->rra_cnt;
802 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
803 i++)
804 {
805 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
806 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
807 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
808 if (start_pdp_offset <= elapsed_pdp_st) {
809 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
810 rrd.rra_def[i].pdp_cnt + 1;
811 } else {
812 rra_step_cnt[i] = 0;
813 }
815 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
816 {
817 /* If this is a bulk update, we need to skip ahead in the seasonal
818 * arrays so that they will be correct for the next observed value;
819 * note that for the bulk update itself, no update will occur to
820 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
821 * be set to DNAN. */
822 if (rra_step_cnt[i] > 2)
823 {
824 /* skip update by resetting rra_step_cnt[i],
825 * note that this is not data source specific; this is due
826 * to the bulk update, not a DNAN value for the specific data
827 * source. */
828 rra_step_cnt[i] = 0;
829 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
830 &last_seasonal_coef);
831 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
832 &seasonal_coef);
833 }
835 /* periodically run a smoother for seasonal effects */
836 /* Need to use first cdp parameter buffer to track
837 * burnin (burnin requires a specific smoothing schedule).
838 * The CDP_init_seasonal parameter is really an RRA level,
839 * not a data source within RRA level parameter, but the rra_def
840 * is read only for rrd_update (not flushed to disk). */
841 iii = i*(rrd.stat_head -> ds_cnt);
842 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
843 <= BURNIN_CYCLES)
844 {
845 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
846 > rrd.rra_def[i].row_cnt - 1) {
847 /* mark off one of the burnin cycles */
848 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
849 schedule_smooth = 1;
850 }
851 } else {
852 /* someone has no doubt invented a trick to deal with this
853 * wrap around, but at least this code is clear. */
854 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
855 rrd.rra_ptr[i].cur_row)
856 {
857 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
858 * mapping between PDP and CDP */
859 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
860 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
861 {
862 #ifdef DEBUG
863 fprintf(stderr,
864 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
865 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
866 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
867 #endif
868 schedule_smooth = 1;
869 }
870 } else {
871 /* can't rely on negative numbers because we are working with
872 * unsigned values */
873 /* Don't need modulus here. If we've wrapped more than once, only
874 * one smooth is executed at the end. */
875 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
876 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
877 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
878 {
879 #ifdef DEBUG
880 fprintf(stderr,
881 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
882 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
883 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
884 #endif
885 schedule_smooth = 1;
886 }
887 }
888 }
890 rra_current = ftell(rrd_file);
891 } /* if cf is DEVSEASONAL or SEASONAL */
893 if (rrd_test_error()) break;
895 /* update CDP_PREP areas */
896 /* loop over data soures within each RRA */
897 for(ii = 0;
898 ii < rrd.stat_head->ds_cnt;
899 ii++)
900 {
902 /* iii indexes the CDP prep area for this data source within the RRA */
903 iii=i*rrd.stat_head->ds_cnt+ii;
905 if (rrd.rra_def[i].pdp_cnt > 1) {
907 if (rra_step_cnt[i] > 0) {
908 /* If we are in this block, as least 1 CDP value will be written to
909 * disk, this is the CDP_primary_val entry. If more than 1 value needs
910 * to be written, then the "fill in" value is the CDP_secondary_val
911 * entry. */
912 if (isnan(pdp_temp[ii]))
913 {
914 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
915 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
916 } else {
917 /* CDP_secondary value is the RRA "fill in" value for intermediary
918 * CDP data entries. No matter the CF, the value is the same because
919 * the average, max, min, and last of a list of identical values is
920 * the same, namely, the value itself. */
921 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
922 }
924 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
925 > rrd.rra_def[i].pdp_cnt*
926 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
927 {
928 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
929 /* initialize carry over */
930 if (current_cf == CF_AVERAGE) {
931 if (isnan(pdp_temp[ii])) {
932 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
933 } else {
934 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
935 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
936 }
937 } else {
938 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
939 }
940 } else {
941 rrd_value_t cum_val, cur_val;
942 switch (current_cf) {
943 case CF_AVERAGE:
944 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
945 cur_val = IFDNAN(pdp_temp[ii],0.0);
946 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
947 (cum_val + cur_val * start_pdp_offset) /
948 (rrd.rra_def[i].pdp_cnt
949 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
950 /* initialize carry over value */
951 if (isnan(pdp_temp[ii])) {
952 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
953 } else {
954 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
955 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
956 }
957 break;
958 case CF_MAXIMUM:
959 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
960 cur_val = IFDNAN(pdp_temp[ii],-DINF);
961 #ifdef DEBUG
962 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
963 isnan(pdp_temp[ii])) {
964 fprintf(stderr,
965 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
966 i,ii);
967 exit(-1);
968 }
969 #endif
970 if (cur_val > cum_val)
971 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
972 else
973 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
974 /* initialize carry over value */
975 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
976 break;
977 case CF_MINIMUM:
978 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
979 cur_val = IFDNAN(pdp_temp[ii],DINF);
980 #ifdef DEBUG
981 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
982 isnan(pdp_temp[ii])) {
983 fprintf(stderr,
984 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
985 i,ii);
986 exit(-1);
987 }
988 #endif
989 if (cur_val < cum_val)
990 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
991 else
992 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
993 /* initialize carry over value */
994 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
995 break;
996 case CF_LAST:
997 default:
998 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
999 /* initialize carry over value */
1000 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1001 break;
1002 }
1003 } /* endif meets xff value requirement for a valid value */
1004 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1005 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1006 if (isnan(pdp_temp[ii]))
1007 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1008 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1009 else
1010 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1011 } else /* rra_step_cnt[i] == 0 */
1012 {
1013 #ifdef DEBUG
1014 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1015 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1016 i,ii);
1017 } else {
1018 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1019 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1020 }
1021 #endif
1022 if (isnan(pdp_temp[ii])) {
1023 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1024 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1025 {
1026 if (current_cf == CF_AVERAGE) {
1027 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1028 elapsed_pdp_st;
1029 } else {
1030 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1031 }
1032 #ifdef DEBUG
1033 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1034 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1035 #endif
1036 } else {
1037 switch (current_cf) {
1038 case CF_AVERAGE:
1039 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1040 elapsed_pdp_st;
1041 break;
1042 case CF_MINIMUM:
1043 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1044 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045 break;
1046 case CF_MAXIMUM:
1047 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1048 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049 break;
1050 case CF_LAST:
1051 default:
1052 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1053 break;
1054 }
1055 }
1056 }
1057 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1058 if (elapsed_pdp_st > 2)
1059 {
1060 switch (current_cf) {
1061 case CF_AVERAGE:
1062 default:
1063 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1064 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1065 break;
1066 case CF_SEASONAL:
1067 case CF_DEVSEASONAL:
1068 /* need to update cached seasonal values, so they are consistent
1069 * with the bulk update */
1070 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1071 * CDP_last_deviation are the same. */
1072 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1073 last_seasonal_coef[ii];
1074 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1075 seasonal_coef[ii];
1076 break;
1077 case CF_HWPREDICT:
1078 /* need to update the null_count and last_null_count.
1079 * even do this for non-DNAN pdp_temp because the
1080 * algorithm is not learning from batch updates. */
1081 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1082 elapsed_pdp_st;
1083 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1084 elapsed_pdp_st - 1;
1085 /* fall through */
1086 case CF_DEVPREDICT:
1087 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1088 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1089 break;
1090 case CF_FAILURES:
1091 /* do not count missed bulk values as failures */
1092 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1093 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1094 /* need to reset violations buffer.
1095 * could do this more carefully, but for now, just
1096 * assume a bulk update wipes away all violations. */
1097 erase_violations(&rrd, iii, i);
1098 break;
1099 }
1100 }
1101 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1103 if (rrd_test_error()) break;
1105 } /* endif data sources loop */
1106 } /* end RRA Loop */
1108 /* this loop is only entered if elapsed_pdp_st < 3 */
1109 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1110 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1111 {
1112 for(i = 0, rra_start = rra_begin;
1113 i < rrd.stat_head->rra_cnt;
1114 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1115 i++)
1116 {
1117 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1119 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1120 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1121 {
1122 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1123 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1124 &seasonal_coef);
1125 rra_current = ftell(rrd_file);
1126 }
1127 if (rrd_test_error()) break;
1128 /* loop over data soures within each RRA */
1129 for(ii = 0;
1130 ii < rrd.stat_head->ds_cnt;
1131 ii++)
1132 {
1133 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1134 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1135 scratch_idx, seasonal_coef);
1136 }
1137 } /* end RRA Loop */
1138 if (rrd_test_error()) break;
1139 } /* end elapsed_pdp_st loop */
1141 if (rrd_test_error()) break;
1143 /* Ready to write to disk */
1144 /* Move sequentially through the file, writing one RRA at a time.
1145 * Note this architecture divorces the computation of CDP with
1146 * flushing updated RRA entries to disk. */
1147 for(i = 0, rra_start = rra_begin;
1148 i < rrd.stat_head->rra_cnt;
1149 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1150 i++) {
1151 /* is there anything to write for this RRA? If not, continue. */
1152 if (rra_step_cnt[i] == 0) continue;
1154 /* write the first row */
1155 #ifdef DEBUG
1156 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1157 #endif
1158 rrd.rra_ptr[i].cur_row++;
1159 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1160 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1161 /* positition on the first row */
1162 rra_pos_tmp = rra_start +
1163 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1164 if(rra_pos_tmp != rra_current) {
1165 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1166 rrd_set_error("seek error in rrd");
1167 break;
1168 }
1169 rra_current = rra_pos_tmp;
1170 }
1172 #ifdef DEBUG
1173 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1174 #endif
1175 scratch_idx = CDP_primary_val;
1176 if (pcdp_summary != NULL)
1177 {
1178 rra_time = (current_time - current_time
1179 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1180 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1181 }
1182 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1183 pcdp_summary, &rra_time);
1184 if (rrd_test_error()) break;
1186 /* write other rows of the bulk update, if any */
1187 scratch_idx = CDP_secondary_val;
1188 for ( ; rra_step_cnt[i] > 1;
1189 rra_step_cnt[i]--, rrd.rra_ptr[i].cur_row++)
1190 {
1191 if (rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1192 {
1193 #ifdef DEBUG
1194 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1195 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1196 #endif
1197 /* wrap */
1198 rrd.rra_ptr[i].cur_row = 0;
1199 /* seek back to beginning of current rra */
1200 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1201 {
1202 rrd_set_error("seek error in rrd");
1203 break;
1204 }
1205 #ifdef DEBUG
1206 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1207 #endif
1208 rra_current = rra_start;
1209 }
1210 if (pcdp_summary != NULL)
1211 {
1212 rra_time = (current_time - current_time
1213 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1214 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1215 }
1216 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1217 pcdp_summary, &rra_time);
1218 }
1220 if (rrd_test_error())
1221 break;
1222 } /* RRA LOOP */
1224 /* break out of the argument parsing loop if error_string is set */
1225 if (rrd_test_error()){
1226 free(step_start);
1227 break;
1228 }
1230 } /* endif a pdp_st has occurred */
1231 rrd.live_head->last_up = current_time;
1232 rrd.live_head->last_up_usec = current_time_usec;
1233 free(step_start);
1234 } /* function argument loop */
1236 if (seasonal_coef != NULL) free(seasonal_coef);
1237 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1238 if (rra_step_cnt != NULL) free(rra_step_cnt);
1239 rpnstack_free(&rpnstack);
1241 /* if we got here and if there is an error and if the file has not been
1242 * written to, then close things up and return. */
1243 if (rrd_test_error()) {
1244 free(updvals);
1245 free(tmpl_idx);
1246 rrd_free(&rrd);
1247 free(pdp_temp);
1248 free(pdp_new);
1249 fclose(rrd_file);
1250 return(-1);
1251 }
1253 /* aargh ... that was tough ... so many loops ... anyway, its done.
1254 * we just need to write back the live header portion now*/
1256 if (fseek(rrd_file, (sizeof(stat_head_t)
1257 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1258 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1259 SEEK_SET) != 0) {
1260 rrd_set_error("seek rrd for live header writeback");
1261 free(updvals);
1262 free(tmpl_idx);
1263 rrd_free(&rrd);
1264 free(pdp_temp);
1265 free(pdp_new);
1266 fclose(rrd_file);
1267 return(-1);
1268 }
1270 if(version >= 3) {
1271 if(fwrite( rrd.live_head,
1272 sizeof(live_head_t), 1, rrd_file) != 1){
1273 rrd_set_error("fwrite live_head to rrd");
1274 free(updvals);
1275 rrd_free(&rrd);
1276 free(tmpl_idx);
1277 free(pdp_temp);
1278 free(pdp_new);
1279 fclose(rrd_file);
1280 return(-1);
1281 }
1282 }
1283 else {
1284 if(fwrite( &rrd.live_head->last_up,
1285 sizeof(time_t), 1, rrd_file) != 1){
1286 rrd_set_error("fwrite live_head to rrd");
1287 free(updvals);
1288 rrd_free(&rrd);
1289 free(tmpl_idx);
1290 free(pdp_temp);
1291 free(pdp_new);
1292 fclose(rrd_file);
1293 return(-1);
1294 }
1295 }
1298 if(fwrite( rrd.pdp_prep,
1299 sizeof(pdp_prep_t),
1300 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1301 rrd_set_error("ftwrite pdp_prep to rrd");
1302 free(updvals);
1303 rrd_free(&rrd);
1304 free(tmpl_idx);
1305 free(pdp_temp);
1306 free(pdp_new);
1307 fclose(rrd_file);
1308 return(-1);
1309 }
1311 if(fwrite( rrd.cdp_prep,
1312 sizeof(cdp_prep_t),
1313 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1314 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1316 rrd_set_error("ftwrite cdp_prep to rrd");
1317 free(updvals);
1318 free(tmpl_idx);
1319 rrd_free(&rrd);
1320 free(pdp_temp);
1321 free(pdp_new);
1322 fclose(rrd_file);
1323 return(-1);
1324 }
1326 if(fwrite( rrd.rra_ptr,
1327 sizeof(rra_ptr_t),
1328 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1329 rrd_set_error("fwrite rra_ptr to rrd");
1330 free(updvals);
1331 free(tmpl_idx);
1332 rrd_free(&rrd);
1333 free(pdp_temp);
1334 free(pdp_new);
1335 fclose(rrd_file);
1336 return(-1);
1337 }
1339 /* OK now close the files and free the memory */
1340 if(fclose(rrd_file) != 0){
1341 rrd_set_error("closing rrd");
1342 free(updvals);
1343 free(tmpl_idx);
1344 rrd_free(&rrd);
1345 free(pdp_temp);
1346 free(pdp_new);
1347 return(-1);
1348 }
1350 /* calling the smoothing code here guarantees at most
1351 * one smoothing operation per rrd_update call. Unfortunately,
1352 * it is possible with bulk updates, or a long-delayed update
1353 * for smoothing to occur off-schedule. This really isn't
1354 * critical except during the burning cycles. */
1355 if (schedule_smooth)
1356 {
1357 #ifndef WIN32
1358 rrd_file = fopen(filename,"r+");
1359 #else
1360 rrd_file = fopen(filename,"rb+");
1361 #endif
1362 rra_start = rra_begin;
1363 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1364 {
1365 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1366 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1367 {
1368 #ifdef DEBUG
1369 fprintf(stderr,"Running smoother for rra %ld\n",i);
1370 #endif
1371 apply_smoother(&rrd,i,rra_start,rrd_file);
1372 if (rrd_test_error())
1373 break;
1374 }
1375 rra_start += rrd.rra_def[i].row_cnt
1376 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1377 }
1378 fclose(rrd_file);
1379 }
1380 rrd_free(&rrd);
1381 free(updvals);
1382 free(tmpl_idx);
1383 free(pdp_new);
1384 free(pdp_temp);
1385 return(0);
1386 }
1388 /*
1389 * get exclusive lock to whole file.
1390 * lock gets removed when we close the file
1391 *
1392 * returns 0 on success
1393 */
1394 int
1395 LockRRD(FILE *rrdfile)
1396 {
1397 int rrd_fd; /* File descriptor for RRD */
1398 int stat;
1400 rrd_fd = fileno(rrdfile);
1402 {
1403 #ifndef WIN32
1404 struct flock lock;
1405 lock.l_type = F_WRLCK; /* exclusive write lock */
1406 lock.l_len = 0; /* whole file */
1407 lock.l_start = 0; /* start of file */
1408 lock.l_whence = SEEK_SET; /* end of file */
1410 stat = fcntl(rrd_fd, F_SETLK, &lock);
1411 #else
1412 struct _stat st;
1414 if ( _fstat( rrd_fd, &st ) == 0 ) {
1415 stat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1416 } else {
1417 stat = -1;
1418 }
1419 #endif
1420 }
1422 return(stat);
1423 }
1426 info_t
1427 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1428 unsigned short CDP_scratch_idx, FILE *rrd_file,
1429 info_t *pcdp_summary, time_t *rra_time)
1430 {
1431 unsigned long ds_idx, cdp_idx;
1432 infoval iv;
1434 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1435 {
1436 /* compute the cdp index */
1437 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1438 #ifdef DEBUG
1439 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1440 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1441 rrd -> rra_def[rra_idx].cf_nam);
1442 #endif
1443 if (pcdp_summary != NULL)
1444 {
1445 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1446 /* append info to the return hash */
1447 pcdp_summary = info_push(pcdp_summary,
1448 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1449 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1450 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1451 RD_I_VAL, iv);
1452 }
1453 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1454 sizeof(rrd_value_t),1,rrd_file) != 1)
1455 {
1456 rrd_set_error("writing rrd");
1457 return;
1458 }
1459 *rra_current += sizeof(rrd_value_t);
1460 }
1461 return (pcdp_summary);
1462 }