Code

remove excess flush calls ...
[rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
9 #include <stdlib.h>
11 #include "rrd_tool.h"
12 #include "rrd_hw.h"
13 #include "rrd_hw_math.h"
14 #include "rrd_hw_update.h"
16 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
18 /* #define DEBUG */
20 /* private functions */
21 static unsigned long MyMod(
22     signed long val,
23     unsigned long mod);
25 int lookup_seasonal(
26     rrd_t *rrd,
27     unsigned long rra_idx,
28     unsigned long rra_start,
29     rrd_file_t *rrd_file,
30     unsigned long offset,
31     rrd_value_t **seasonal_coef)
32 {
33     unsigned long pos_tmp;
35     /* rra_ptr[].cur_row points to the rra row to be written; this function
36      * reads cur_row + offset */
37     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
39     /* handle wrap around */
40     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
41         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
43     /* rra_start points to the appropriate rra block in the file */
44     /* compute the pointer to the appropriate location in the file */
45     pos_tmp =
46         rra_start +
47         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
49     /* allocate memory if need be */
50     if (*seasonal_coef == NULL)
51         *seasonal_coef =
52             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
53                                    sizeof(rrd_value_t));
54     if (*seasonal_coef == NULL) {
55         rrd_set_error("memory allocation failure: seasonal coef");
56         return -1;
57     }
59     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
60         if (rrd_read
61             (rrd_file, *seasonal_coef,
62              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
63             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
64             /* success! */
65             /* we can safely ignore the rule requiring a seek operation between read
66              * and write, because this read moves the file pointer to somewhere
67              * in the file other than the next write location.
68              * */
69             return 0;
70         } else {
71             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
72                           pos_tmp);
73         }
74     } else {
75         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
76                       pos_tmp);
77     }
79     return -1;
80 }
82 /* For the specified CDP prep area and the FAILURES RRA,
83  * erase all history of past violations.
84  */
85 void erase_violations(
86     rrd_t *rrd,
87     unsigned long cdp_idx,
88     unsigned long rra_idx)
89 {
90     unsigned short i;
91     char     *violations_array;
93     /* check that rra_idx is a CF_FAILURES array */
94     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
95 #ifdef DEBUG
96         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
97                 rrd->rra_def[rra_idx].cf_nam);
98 #endif
99         return;
100     }
101 #ifdef DEBUG
102     fprintf(stderr, "scratch buffer before erase:\n");
103     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
104         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
105     }
106     fprintf(stderr, "\n");
107 #endif
109     /* WARNING: an array of longs on disk is treated as an array of chars
110      * in memory. */
111     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
112     /* erase everything in the part of the CDP scratch array that will be
113      * used to store violations for the current window */
114     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
115         violations_array[i - 1] = 0;
116     }
117 #ifdef DEBUG
118     fprintf(stderr, "scratch buffer after erase:\n");
119     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
120         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
121     }
122     fprintf(stderr, "\n");
123 #endif
126 /* Smooth a periodic array with a moving average: equal weights and
127  * length = 5% of the period. */
128 int apply_smoother(
129     rrd_t *rrd,
130     unsigned long rra_idx,
131     unsigned long rra_start,
132     rrd_file_t *rrd_file)
134     unsigned long i, j, k;
135     unsigned long totalbytes;
136     rrd_value_t *rrd_values;
137     unsigned long row_length = rrd->stat_head->ds_cnt;
138     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
139     unsigned long offset;
140     FIFOqueue **buffers;
141     rrd_value_t *working_average;
142     rrd_value_t *baseline;
144     if (atoi(rrd->stat_head->version) >= 4) {
145         offset = floor(rrd->rra_def[rra_idx].
146                        par[RRA_seasonal_smoothing_window].
147                        u_val / 2 * row_count);
148     } else {
149         offset = floor(0.05 / 2 * row_count);
150     }
152     if (offset == 0)
153         return 0;       /* no smoothing */
155     /* allocate memory */
156     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
157     rrd_values = (rrd_value_t *) malloc(totalbytes);
158     if (rrd_values == NULL) {
159         rrd_set_error("apply smoother: memory allocation failure");
160         return -1;
161     }
163     /* rra_start is at the beginning of this rra */
164     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
165         rrd_set_error("seek to rra %d failed", rra_start);
166         free(rrd_values);
167         return -1;
168     }
170     /* could read all data in a single block, but we need to
171      * check for NA values */
172     for (i = 0; i < row_count; ++i) {
173         for (j = 0; j < row_length; ++j) {
174             if (rrd_read
175                 (rrd_file, &(rrd_values[i * row_length + j]),
176                  sizeof(rrd_value_t) * 1)
177                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
178                 rrd_set_error("reading value failed: %s",
179                               rrd_strerror(errno));
180             }
181             if (isnan(rrd_values[i * row_length + j])) {
182                 /* can't apply smoothing, still uninitialized values */
183 #ifdef DEBUG
184                 fprintf(stderr,
185                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
186                         i, j);
187 #endif
188                 free(rrd_values);
189                 return 0;
190             }
191         }
192     }
194     /* allocate queues, one for each data source */
195     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
196     for (i = 0; i < row_length; ++i) {
197         queue_alloc(&(buffers[i]), 2 * offset + 1);
198     }
199     /* need working average initialized to 0 */
200     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
201     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203     /* compute sums of the first 2*offset terms */
204     for (i = 0; i < 2 * offset; ++i) {
205         k = MyMod(i - offset, row_count);
206         for (j = 0; j < row_length; ++j) {
207             queue_push(buffers[j], rrd_values[k * row_length + j]);
208             working_average[j] += rrd_values[k * row_length + j];
209         }
210     }
212     /* compute moving averages */
213     for (i = offset; i < row_count + offset; ++i) {
214         for (j = 0; j < row_length; ++j) {
215             k = MyMod(i, row_count);
216             /* add a term to the sum */
217             working_average[j] += rrd_values[k * row_length + j];
218             queue_push(buffers[j], rrd_values[k * row_length + j]);
220             /* reset k to be the center of the window */
221             k = MyMod(i - offset, row_count);
222             /* overwrite rdd_values entry, the old value is already
223              * saved in buffers */
224             rrd_values[k * row_length + j] =
225                 working_average[j] / (2 * offset + 1);
226             baseline[j] += rrd_values[k * row_length + j];
228             /* remove a term from the sum */
229             working_average[j] -= queue_pop(buffers[j]);
230         }
231     }
233     for (i = 0; i < row_length; ++i) {
234         queue_dealloc(buffers[i]);
235         baseline[i] /= row_count;
236     }
237     free(buffers);
238     free(working_average);
240     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
241         rrd_value_t (
242     *init_seasonality) (
243     rrd_value_t seasonal_coef,
244     rrd_value_t intercept);
246         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
247         case CF_HWPREDICT:
248             init_seasonality = hw_additive_init_seasonality;
249             break;
250         case CF_MHWPREDICT:
251             init_seasonality = hw_multiplicative_init_seasonality;
252             break;
253         default:
254             rrd_set_error("apply smoother: SEASONAL rra doesn't have "
255                           "valid dependency: %s",
256                           rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
257             return -1;
258         }
260         for (j = 0; j < row_length; ++j) {
261             for (i = 0; i < row_count; ++i) {
262                 rrd_values[i * row_length + j] =
263                     init_seasonality(rrd_values[i * row_length + j],
264                                      baseline[j]);
265             }
266             /* update the baseline coefficient,
267              * first, compute the cdp_index. */
268             offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
269             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
270                 baseline[j];
271         }
272         /* flush cdp to disk */
273         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
274                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
275                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
276                      sizeof(live_head_t) +
277                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
278             rrd_set_error("apply_smoother: seek to cdp_prep failed");
279             free(rrd_values);
280             return -1;
281         }
282         if (rrd_write(rrd_file, rrd->cdp_prep,
283                       sizeof(cdp_prep_t) *
284                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
285             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
286                           (rrd->stat_head->ds_cnt))) {
287             rrd_set_error("apply_smoother: cdp_prep write failed");
288             free(rrd_values);
289             return -1;
290         }
291     }
293     /* endif CF_SEASONAL */
294     /* flush updated values to disk */
295     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
296         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
297         free(rrd_values);
298         return -1;
299     }
300     /* write as a single block */
301     if (rrd_write
302         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
303         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
304         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
305         free(rrd_values);
306         return -1;
307     }
309     free(rrd_values);
310     free(baseline);
311     return 0;
314 /* Reset aberrant behavior model coefficients, including intercept, slope,
315  * seasonal, and seasonal deviation for the specified data source. */
316 void reset_aberrant_coefficients(
317     rrd_t *rrd,
318     rrd_file_t *rrd_file,
319     unsigned long ds_idx)
321     unsigned long cdp_idx, rra_idx, i;
322     unsigned long cdp_start, rra_start;
323     rrd_value_t nan_buffer = DNAN;
325     /* compute the offset for the cdp area */
326     cdp_start = sizeof(stat_head_t) +
327         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
328         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
329         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
330     /* compute the offset for the first rra */
331     rra_start = cdp_start +
332         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
333         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
335     /* loop over the RRAs */
336     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
337         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
338         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
339         case CF_HWPREDICT:
340         case CF_MHWPREDICT:
341             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
342             break;
343         case CF_SEASONAL:
344         case CF_DEVSEASONAL:
345             /* don't use init_seasonal because it will reset burn-in, which
346              * means different data sources will be calling for the smoother
347              * at different times. */
348             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
349             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
350             /* move to first entry of data source for this rra */
351             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
352                      SEEK_SET);
353             /* entries for the same data source are not contiguous, 
354              * temporal entries are contiguous */
355             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
356                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
357                     != sizeof(rrd_value_t) * 1) {
358                     rrd_set_error
359                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
360                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
361                     return;
362                 }
363                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
364                          sizeof(rrd_value_t), SEEK_CUR);
365             }
366             break;
367         case CF_FAILURES:
368             erase_violations(rrd, cdp_idx, rra_idx);
369             break;
370         default:
371             break;
372         }
373         /* move offset to the next rra */
374         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
375             sizeof(rrd_value_t);
376     }
377     rrd_seek(rrd_file, cdp_start, SEEK_SET);
378     if (rrd_write(rrd_file, rrd->cdp_prep,
379                   sizeof(cdp_prep_t) *
380                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
381         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
382                       (rrd->stat_head->ds_cnt))) {
383         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
384     }
387 void init_hwpredict_cdp(
388     cdp_prep_t *cdp)
390     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
391     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
392     cdp->scratch[CDP_hw_slope].u_val = DNAN;
393     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
394     cdp->scratch[CDP_null_count].u_cnt = 1;
395     cdp->scratch[CDP_last_null_count].u_cnt = 1;
398 void init_seasonal_cdp(
399     cdp_prep_t *cdp)
401     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
402     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
403     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
406 int update_aberrant_CF(
407     rrd_t *rrd,
408     rrd_value_t pdp_val,
409     enum cf_en current_cf,
410     unsigned long cdp_idx,
411     unsigned long rra_idx,
412     unsigned long ds_idx,
413     unsigned short CDP_scratch_idx,
414     rrd_value_t *seasonal_coef)
416     static hw_functions_t hw_multiplicative_functions = {
417         hw_multiplicative_calculate_prediction,
418         hw_multiplicative_calculate_intercept,
419         hw_calculate_slope,
420         hw_multiplicative_calculate_seasonality,
421         hw_multiplicative_init_seasonality,
422         hw_calculate_seasonal_deviation,
423         hw_init_seasonal_deviation,
424         1.0             /* identity value */
425     };
427     static hw_functions_t hw_additive_functions = {
428         hw_additive_calculate_prediction,
429         hw_additive_calculate_intercept,
430         hw_calculate_slope,
431         hw_additive_calculate_seasonality,
432         hw_additive_init_seasonality,
433         hw_calculate_seasonal_deviation,
434         hw_init_seasonal_deviation,
435         0.0             /* identity value  */
436     };
438     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
439     switch (current_cf) {
440     case CF_HWPREDICT:
441         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
442                                 CDP_scratch_idx, &hw_additive_functions);
443     case CF_MHWPREDICT:
444         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
445                                 CDP_scratch_idx,
446                                 &hw_multiplicative_functions);
447     case CF_DEVPREDICT:
448         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
449                                  CDP_scratch_idx);
450     case CF_SEASONAL:
451         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
452         case CF_HWPREDICT:
453             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
454                                    CDP_scratch_idx, seasonal_coef,
455                                    &hw_additive_functions);
456         case CF_MHWPREDICT:
457             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
458                                    CDP_scratch_idx, seasonal_coef,
459                                    &hw_multiplicative_functions);
460         default:
461             return -1;
462         }
463     case CF_DEVSEASONAL:
464         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
465         case CF_HWPREDICT:
466             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
467                                       CDP_scratch_idx, seasonal_coef,
468                                       &hw_additive_functions);
469         case CF_MHWPREDICT:
470             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
471                                       CDP_scratch_idx, seasonal_coef,
472                                       &hw_multiplicative_functions);
473         default:
474             return -1;
475         }
476     case CF_FAILURES:
477         switch (cf_conv
478                 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
479                  cf_nam)) {
480         case CF_HWPREDICT:
481             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
482                                    CDP_scratch_idx, &hw_additive_functions);
483         case CF_MHWPREDICT:
484             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
485                                    CDP_scratch_idx,
486                                    &hw_multiplicative_functions);
487         default:
488             return -1;
489         }
490     case CF_AVERAGE:
491     default:
492         return 0;
493     }
494     return -1;
497 static unsigned long MyMod(
498     signed long val,
499     unsigned long mod)
501     unsigned long new_val;
503     if (val < 0)
504         new_val = ((unsigned long) abs(val)) % mod;
505     else
506         new_val = (val % mod);
508     if (val < 0)
509         return (mod - new_val);
510     else
511         return (new_val);
514 /* a standard fixed-capacity FIF0 queue implementation
515  * No overflow checking is performed. */
516 int queue_alloc(
517     FIFOqueue **q,
518     int capacity)
520     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
521     if (*q == NULL)
522         return -1;
523     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
524     if ((*q)->queue == NULL) {
525         free(*q);
526         return -1;
527     }
528     (*q)->capacity = capacity;
529     (*q)->head = capacity;
530     (*q)->tail = 0;
531     return 0;
534 int queue_isempty(
535     FIFOqueue *q)
537     return (q->head % q->capacity == q->tail);
540 void queue_push(
541     FIFOqueue *q,
542     rrd_value_t value)
544     q->queue[(q->tail)++] = value;
545     q->tail = q->tail % q->capacity;
548 rrd_value_t queue_pop(
549     FIFOqueue *q)
551     q->head = q->head % q->capacity;
552     return q->queue[(q->head)++];
555 void queue_dealloc(
556     FIFOqueue *q)
558     free(q->queue);
559     free(q);