Code

e95fa082ae99243f15298eceaf1c599ecb64a55b
[pkg-rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.3.5  Copyright by Tobi Oetiker, 1997-2008
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
14 #ifdef WIN32
15 #include <stdlib.h>
16 #endif
18 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
20 /* #define DEBUG */
22 /* private functions */
23 static unsigned long MyMod(
24     signed long val,
25     unsigned long mod);
27 int lookup_seasonal(
28     rrd_t *rrd,
29     unsigned long rra_idx,
30     unsigned long rra_start,
31     rrd_file_t *rrd_file,
32     unsigned long offset,
33     rrd_value_t **seasonal_coef)
34 {
35     unsigned long pos_tmp;
37     /* rra_ptr[].cur_row points to the rra row to be written; this function
38      * reads cur_row + offset */
39     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
41     /* handle wrap around */
42     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
43         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
45     /* rra_start points to the appropriate rra block in the file */
46     /* compute the pointer to the appropriate location in the file */
47     pos_tmp =
48         rra_start +
49         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
51     /* allocate memory if need be */
52     if (*seasonal_coef == NULL)
53         *seasonal_coef =
54             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
55                                    sizeof(rrd_value_t));
56     if (*seasonal_coef == NULL) {
57         rrd_set_error("memory allocation failure: seasonal coef");
58         return -1;
59     }
61     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
62         if (rrd_read
63             (rrd_file, *seasonal_coef,
64              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
65             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
66             /* success! */
67             /* we can safely ignore the rule requiring a seek operation between read
68              * and write, because this read moves the file pointer to somewhere
69              * in the file other than the next write location.
70              * */
71             return 0;
72         } else {
73             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
74                           pos_tmp);
75         }
76     } else {
77         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
78                       pos_tmp);
79     }
81     return -1;
82 }
84 /* For the specified CDP prep area and the FAILURES RRA,
85  * erase all history of past violations.
86  */
87 void erase_violations(
88     rrd_t *rrd,
89     unsigned long cdp_idx,
90     unsigned long rra_idx)
91 {
92     unsigned short i;
93     char     *violations_array;
95     /* check that rra_idx is a CF_FAILURES array */
96     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
97 #ifdef DEBUG
98         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
99                 rrd->rra_def[rra_idx].cf_nam);
100 #endif
101         return;
102     }
103 #ifdef DEBUG
104     fprintf(stderr, "scratch buffer before erase:\n");
105     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
106         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
107     }
108     fprintf(stderr, "\n");
109 #endif
111     /* WARNING: an array of longs on disk is treated as an array of chars
112      * in memory. */
113     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
114     /* erase everything in the part of the CDP scratch array that will be
115      * used to store violations for the current window */
116     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
117         violations_array[i - 1] = 0;
118     }
119 #ifdef DEBUG
120     fprintf(stderr, "scratch buffer after erase:\n");
121     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
122         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
123     }
124     fprintf(stderr, "\n");
125 #endif
128 /* Smooth a periodic array with a moving average: equal weights and
129  * length = 5% of the period. */
130 int apply_smoother(
131     rrd_t *rrd,
132     unsigned long rra_idx,
133     unsigned long rra_start,
134     rrd_file_t *rrd_file)
136     unsigned long i, j, k;
137     unsigned long totalbytes;
138     rrd_value_t *rrd_values;
139     unsigned long row_length = rrd->stat_head->ds_cnt;
140     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
141     unsigned long offset;
142     FIFOqueue **buffers;
143     rrd_value_t *working_average;
144     rrd_value_t *baseline;
146     if (atoi(rrd->stat_head->version) >= 4) {
147         offset = floor(rrd->rra_def[rra_idx].
148                        par[RRA_seasonal_smoothing_window].
149                        u_val / 2 * row_count);
150     } else {
151         offset = floor(0.05 / 2 * row_count);
152     }
154     if (offset == 0)
155         return 0;       /* no smoothing */
157     /* allocate memory */
158     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
159     rrd_values = (rrd_value_t *) malloc(totalbytes);
160     if (rrd_values == NULL) {
161         rrd_set_error("apply smoother: memory allocation failure");
162         return -1;
163     }
165     /* rra_start is at the beginning of this rra */
166     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
167         rrd_set_error("seek to rra %d failed", rra_start);
168         free(rrd_values);
169         return -1;
170     }
171     rrd_flush(rrd_file);
172     /* could read all data in a single block, but we need to
173      * check for NA values */
174     for (i = 0; i < row_count; ++i) {
175         for (j = 0; j < row_length; ++j) {
176             if (rrd_read
177                 (rrd_file, &(rrd_values[i * row_length + j]),
178                  sizeof(rrd_value_t) * 1)
179                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
180                 rrd_set_error("reading value failed: %s",
181                               rrd_strerror(errno));
182             }
183             if (isnan(rrd_values[i * row_length + j])) {
184                 /* can't apply smoothing, still uninitialized values */
185 #ifdef DEBUG
186                 fprintf(stderr,
187                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
188                         i, j);
189 #endif
190                 free(rrd_values);
191                 return 0;
192             }
193         }
194     }
196     /* allocate queues, one for each data source */
197     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
198     for (i = 0; i < row_length; ++i) {
199         queue_alloc(&(buffers[i]), 2 * offset + 1);
200     }
201     /* need working average initialized to 0 */
202     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
205     /* compute sums of the first 2*offset terms */
206     for (i = 0; i < 2 * offset; ++i) {
207         k = MyMod(i - offset, row_count);
208         for (j = 0; j < row_length; ++j) {
209             queue_push(buffers[j], rrd_values[k * row_length + j]);
210             working_average[j] += rrd_values[k * row_length + j];
211         }
212     }
214     /* compute moving averages */
215     for (i = offset; i < row_count + offset; ++i) {
216         for (j = 0; j < row_length; ++j) {
217             k = MyMod(i, row_count);
218             /* add a term to the sum */
219             working_average[j] += rrd_values[k * row_length + j];
220             queue_push(buffers[j], rrd_values[k * row_length + j]);
222             /* reset k to be the center of the window */
223             k = MyMod(i - offset, row_count);
224             /* overwrite rdd_values entry, the old value is already
225              * saved in buffers */
226             rrd_values[k * row_length + j] =
227                 working_average[j] / (2 * offset + 1);
228             baseline[j] += rrd_values[k * row_length + j];
230             /* remove a term from the sum */
231             working_average[j] -= queue_pop(buffers[j]);
232         }
233     }
235     for (i = 0; i < row_length; ++i) {
236         queue_dealloc(buffers[i]);
237         baseline[i] /= row_count;
238     }
239     free(buffers);
240     free(working_average);
242     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
243         rrd_value_t (
244     *init_seasonality) (
245     rrd_value_t seasonal_coef,
246     rrd_value_t intercept);
248         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
249         case CF_HWPREDICT:
250             init_seasonality = hw_additive_init_seasonality;
251             break;
252         case CF_MHWPREDICT:
253             init_seasonality = hw_multiplicative_init_seasonality;
254             break;
255         default:
256             rrd_set_error("apply smoother: SEASONAL rra doesn't have "
257                           "valid dependency: %s",
258                           rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
259             return -1;
260         }
262         for (j = 0; j < row_length; ++j) {
263             for (i = 0; i < row_count; ++i) {
264                 rrd_values[i * row_length + j] =
265                     init_seasonality(rrd_values[i * row_length + j],
266                                      baseline[j]);
267             }
268             /* update the baseline coefficient,
269              * first, compute the cdp_index. */
270             offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
271             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
272                 baseline[j];
273         }
274         /* flush cdp to disk */
275         rrd_flush(rrd_file);
276         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
277                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
278                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
279                      sizeof(live_head_t) +
280                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
281             rrd_set_error("apply_smoother: seek to cdp_prep failed");
282             free(rrd_values);
283             return -1;
284         }
285         if (rrd_write(rrd_file, rrd->cdp_prep,
286                       sizeof(cdp_prep_t) *
287                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
288             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
289                           (rrd->stat_head->ds_cnt))) {
290             rrd_set_error("apply_smoother: cdp_prep write failed");
291             free(rrd_values);
292             return -1;
293         }
294     }
296     /* endif CF_SEASONAL */
297     /* flush updated values to disk */
298     rrd_flush(rrd_file);
299     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
300         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
301         free(rrd_values);
302         return -1;
303     }
304     /* write as a single block */
305     if (rrd_write
306         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
307         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
308         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
309         free(rrd_values);
310         return -1;
311     }
313     rrd_flush(rrd_file);
314     free(rrd_values);
315     free(baseline);
316     return 0;
319 /* Reset aberrant behavior model coefficients, including intercept, slope,
320  * seasonal, and seasonal deviation for the specified data source. */
321 void reset_aberrant_coefficients(
322     rrd_t *rrd,
323     rrd_file_t *rrd_file,
324     unsigned long ds_idx)
326     unsigned long cdp_idx, rra_idx, i;
327     unsigned long cdp_start, rra_start;
328     rrd_value_t nan_buffer = DNAN;
330     /* compute the offset for the cdp area */
331     cdp_start = sizeof(stat_head_t) +
332         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
333         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
334         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
335     /* compute the offset for the first rra */
336     rra_start = cdp_start +
337         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
338         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
340     /* loop over the RRAs */
341     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
342         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
343         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
344         case CF_HWPREDICT:
345         case CF_MHWPREDICT:
346             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
347             break;
348         case CF_SEASONAL:
349         case CF_DEVSEASONAL:
350             /* don't use init_seasonal because it will reset burn-in, which
351              * means different data sources will be calling for the smoother
352              * at different times. */
353             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
354             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
355             /* move to first entry of data source for this rra */
356             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
357                      SEEK_SET);
358             /* entries for the same data source are not contiguous, 
359              * temporal entries are contiguous */
360             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
361                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
362                     != sizeof(rrd_value_t) * 1) {
363                     rrd_set_error
364                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
365                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
366                     return;
367                 }
368                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
369                          sizeof(rrd_value_t), SEEK_CUR);
370             }
371             break;
372         case CF_FAILURES:
373             erase_violations(rrd, cdp_idx, rra_idx);
374             break;
375         default:
376             break;
377         }
378         /* move offset to the next rra */
379         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
380             sizeof(rrd_value_t);
381     }
382     rrd_seek(rrd_file, cdp_start, SEEK_SET);
383     if (rrd_write(rrd_file, rrd->cdp_prep,
384                   sizeof(cdp_prep_t) *
385                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
386         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
387                       (rrd->stat_head->ds_cnt))) {
388         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
389     }
392 void init_hwpredict_cdp(
393     cdp_prep_t *cdp)
395     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
396     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
397     cdp->scratch[CDP_hw_slope].u_val = DNAN;
398     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
399     cdp->scratch[CDP_null_count].u_cnt = 1;
400     cdp->scratch[CDP_last_null_count].u_cnt = 1;
403 void init_seasonal_cdp(
404     cdp_prep_t *cdp)
406     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
407     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
408     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
411 int update_aberrant_CF(
412     rrd_t *rrd,
413     rrd_value_t pdp_val,
414     enum cf_en current_cf,
415     unsigned long cdp_idx,
416     unsigned long rra_idx,
417     unsigned long ds_idx,
418     unsigned short CDP_scratch_idx,
419     rrd_value_t *seasonal_coef)
421     static hw_functions_t hw_multiplicative_functions = {
422         hw_multiplicative_calculate_prediction,
423         hw_multiplicative_calculate_intercept,
424         hw_calculate_slope,
425         hw_multiplicative_calculate_seasonality,
426         hw_multiplicative_init_seasonality,
427         hw_calculate_seasonal_deviation,
428         hw_init_seasonal_deviation,
429         1.0             /* identity value */
430     };
432     static hw_functions_t hw_additive_functions = {
433         hw_additive_calculate_prediction,
434         hw_additive_calculate_intercept,
435         hw_calculate_slope,
436         hw_additive_calculate_seasonality,
437         hw_additive_init_seasonality,
438         hw_calculate_seasonal_deviation,
439         hw_init_seasonal_deviation,
440         0.0             /* identity value  */
441     };
443     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
444     switch (current_cf) {
445     case CF_HWPREDICT:
446         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
447                                 CDP_scratch_idx, &hw_additive_functions);
448     case CF_MHWPREDICT:
449         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
450                                 CDP_scratch_idx,
451                                 &hw_multiplicative_functions);
452     case CF_DEVPREDICT:
453         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
454                                  CDP_scratch_idx);
455     case CF_SEASONAL:
456         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
457         case CF_HWPREDICT:
458             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
459                                    CDP_scratch_idx, seasonal_coef,
460                                    &hw_additive_functions);
461         case CF_MHWPREDICT:
462             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
463                                    CDP_scratch_idx, seasonal_coef,
464                                    &hw_multiplicative_functions);
465         default:
466             return -1;
467         }
468     case CF_DEVSEASONAL:
469         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
470         case CF_HWPREDICT:
471             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
472                                       CDP_scratch_idx, seasonal_coef,
473                                       &hw_additive_functions);
474         case CF_MHWPREDICT:
475             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
476                                       CDP_scratch_idx, seasonal_coef,
477                                       &hw_multiplicative_functions);
478         default:
479             return -1;
480         }
481     case CF_FAILURES:
482         switch (cf_conv
483                 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
484                  cf_nam)) {
485         case CF_HWPREDICT:
486             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
487                                    CDP_scratch_idx, &hw_additive_functions);
488         case CF_MHWPREDICT:
489             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
490                                    CDP_scratch_idx,
491                                    &hw_multiplicative_functions);
492         default:
493             return -1;
494         }
495     case CF_AVERAGE:
496     default:
497         return 0;
498     }
499     return -1;
502 static unsigned long MyMod(
503     signed long val,
504     unsigned long mod)
506     unsigned long new_val;
508     if (val < 0)
509         new_val = ((unsigned long) abs(val)) % mod;
510     else
511         new_val = (val % mod);
513     if (val < 0)
514         return (mod - new_val);
515     else
516         return (new_val);
519 /* a standard fixed-capacity FIF0 queue implementation
520  * No overflow checking is performed. */
521 int queue_alloc(
522     FIFOqueue **q,
523     int capacity)
525     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
526     if (*q == NULL)
527         return -1;
528     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
529     if ((*q)->queue == NULL) {
530         free(*q);
531         return -1;
532     }
533     (*q)->capacity = capacity;
534     (*q)->head = capacity;
535     (*q)->tail = 0;
536     return 0;
539 int queue_isempty(
540     FIFOqueue *q)
542     return (q->head % q->capacity == q->tail);
545 void queue_push(
546     FIFOqueue *q,
547     rrd_value_t value)
549     q->queue[(q->tail)++] = value;
550     q->tail = q->tail % q->capacity;
553 rrd_value_t queue_pop(
554     FIFOqueue *q)
556     q->head = q->head % q->capacity;
557     return q->queue[(q->head)++];
560 void queue_dealloc(
561     FIFOqueue *q)
563     free(q->queue);
564     free(q);