Code

Imported upstream version 1.3.8.
[pkg-rrdtool.git] / src / rrd_hw.c
1 /*****************************************************************************
2  * RRDtool 1.3.8  Copyright by Tobi Oetiker, 1997-2009
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
14 #ifdef WIN32
15 #include <stdlib.h>
16 #endif
18 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
20 /* #define DEBUG */
22 /* private functions */
23 static unsigned long MyMod(
24     signed long val,
25     unsigned long mod);
27 int lookup_seasonal(
28     rrd_t *rrd,
29     unsigned long rra_idx,
30     unsigned long rra_start,
31     rrd_file_t *rrd_file,
32     unsigned long offset,
33     rrd_value_t **seasonal_coef)
34 {
35     unsigned long pos_tmp;
37     /* rra_ptr[].cur_row points to the rra row to be written; this function
38      * reads cur_row + offset */
39     unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
41     /* handle wrap around */
42     if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
43         row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
45     /* rra_start points to the appropriate rra block in the file */
46     /* compute the pointer to the appropriate location in the file */
47     pos_tmp =
48         rra_start +
49         (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
51     /* allocate memory if need be */
52     if (*seasonal_coef == NULL)
53         *seasonal_coef =
54             (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
55                                    sizeof(rrd_value_t));
56     if (*seasonal_coef == NULL) {
57         rrd_set_error("memory allocation failure: seasonal coef");
58         return -1;
59     }
61     if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
62         if (rrd_read
63             (rrd_file, *seasonal_coef,
64              sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
65             == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
66             /* success! */
67             /* we can safely ignore the rule requiring a seek operation between read
68              * and write, because this read moves the file pointer to somewhere
69              * in the file other than the next write location.
70              * */
71             return 0;
72         } else {
73             rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
74                           pos_tmp);
75         }
76     } else {
77         rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
78                       pos_tmp);
79     }
81     return -1;
82 }
84 /* For the specified CDP prep area and the FAILURES RRA,
85  * erase all history of past violations.
86  */
87 void erase_violations(
88     rrd_t *rrd,
89     unsigned long cdp_idx,
90     unsigned long rra_idx)
91 {
92     unsigned short i;
93     char     *violations_array;
95     /* check that rra_idx is a CF_FAILURES array */
96     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
97 #ifdef DEBUG
98         fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
99                 rrd->rra_def[rra_idx].cf_nam);
100 #endif
101         return;
102     }
103 #ifdef DEBUG
104     fprintf(stderr, "scratch buffer before erase:\n");
105     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
106         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
107     }
108     fprintf(stderr, "\n");
109 #endif
111     /* WARNING: an array of longs on disk is treated as an array of chars
112      * in memory. */
113     violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
114     /* erase everything in the part of the CDP scratch array that will be
115      * used to store violations for the current window */
116     for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
117         violations_array[i - 1] = 0;
118     }
119 #ifdef DEBUG
120     fprintf(stderr, "scratch buffer after erase:\n");
121     for (i = 0; i < MAX_CDP_PAR_EN; i++) {
122         fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
123     }
124     fprintf(stderr, "\n");
125 #endif
128 /* Smooth a periodic array with a moving average: equal weights and
129  * length = 5% of the period. */
130 int apply_smoother(
131     rrd_t *rrd,
132     unsigned long rra_idx,
133     unsigned long rra_start,
134     rrd_file_t *rrd_file)
136     unsigned long i, j, k;
137     unsigned long totalbytes;
138     rrd_value_t *rrd_values;
139     unsigned long row_length = rrd->stat_head->ds_cnt;
140     unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
141     unsigned long offset;
142     FIFOqueue **buffers;
143     rrd_value_t *working_average;
144     rrd_value_t *baseline;
146     if (atoi(rrd->stat_head->version) >= 4) {
147         offset = floor(rrd->rra_def[rra_idx].
148                        par[RRA_seasonal_smoothing_window].
149                        u_val / 2 * row_count);
150     } else {
151         offset = floor(0.05 / 2 * row_count);
152     }
154     if (offset == 0)
155         return 0;       /* no smoothing */
157     /* allocate memory */
158     totalbytes = sizeof(rrd_value_t) * row_length * row_count;
159     rrd_values = (rrd_value_t *) malloc(totalbytes);
160     if (rrd_values == NULL) {
161         rrd_set_error("apply smoother: memory allocation failure");
162         return -1;
163     }
165     /* rra_start is at the beginning of this rra */
166     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
167         rrd_set_error("seek to rra %d failed", rra_start);
168         free(rrd_values);
169         return -1;
170     }
172     /* could read all data in a single block, but we need to
173      * check for NA values */
174     for (i = 0; i < row_count; ++i) {
175         for (j = 0; j < row_length; ++j) {
176             if (rrd_read
177                 (rrd_file, &(rrd_values[i * row_length + j]),
178                  sizeof(rrd_value_t) * 1)
179                 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
180                 rrd_set_error("reading value failed: %s",
181                               rrd_strerror(errno));
182             }
183             if (isnan(rrd_values[i * row_length + j])) {
184                 /* can't apply smoothing, still uninitialized values */
185 #ifdef DEBUG
186                 fprintf(stderr,
187                         "apply_smoother: NA detected in seasonal array: %ld %ld\n",
188                         i, j);
189 #endif
190                 free(rrd_values);
191                 return 0;
192             }
193         }
194     }
196     /* allocate queues, one for each data source */
197     buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
198     for (i = 0; i < row_length; ++i) {
199         queue_alloc(&(buffers[i]), 2 * offset + 1);
200     }
201     /* need working average initialized to 0 */
202     working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203     baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
205     /* compute sums of the first 2*offset terms */
206     for (i = 0; i < 2 * offset; ++i) {
207         k = MyMod(i - offset, row_count);
208         for (j = 0; j < row_length; ++j) {
209             queue_push(buffers[j], rrd_values[k * row_length + j]);
210             working_average[j] += rrd_values[k * row_length + j];
211         }
212     }
214     /* compute moving averages */
215     for (i = offset; i < row_count + offset; ++i) {
216         for (j = 0; j < row_length; ++j) {
217             k = MyMod(i, row_count);
218             /* add a term to the sum */
219             working_average[j] += rrd_values[k * row_length + j];
220             queue_push(buffers[j], rrd_values[k * row_length + j]);
222             /* reset k to be the center of the window */
223             k = MyMod(i - offset, row_count);
224             /* overwrite rdd_values entry, the old value is already
225              * saved in buffers */
226             rrd_values[k * row_length + j] =
227                 working_average[j] / (2 * offset + 1);
228             baseline[j] += rrd_values[k * row_length + j];
230             /* remove a term from the sum */
231             working_average[j] -= queue_pop(buffers[j]);
232         }
233     }
235     for (i = 0; i < row_length; ++i) {
236         queue_dealloc(buffers[i]);
237         baseline[i] /= row_count;
238     }
239     free(buffers);
240     free(working_average);
242     if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
243         rrd_value_t (
244     *init_seasonality) (
245     rrd_value_t seasonal_coef,
246     rrd_value_t intercept);
248         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
249         case CF_HWPREDICT:
250             init_seasonality = hw_additive_init_seasonality;
251             break;
252         case CF_MHWPREDICT:
253             init_seasonality = hw_multiplicative_init_seasonality;
254             break;
255         default:
256             rrd_set_error("apply smoother: SEASONAL rra doesn't have "
257                           "valid dependency: %s",
258                           rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
259             return -1;
260         }
262         for (j = 0; j < row_length; ++j) {
263             for (i = 0; i < row_count; ++i) {
264                 rrd_values[i * row_length + j] =
265                     init_seasonality(rrd_values[i * row_length + j],
266                                      baseline[j]);
267             }
268             /* update the baseline coefficient,
269              * first, compute the cdp_index. */
270             offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
271             (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
272                 baseline[j];
273         }
274         /* flush cdp to disk */
275         if (rrd_seek(rrd_file, sizeof(stat_head_t) +
276                      rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
277                      rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
278                      sizeof(live_head_t) +
279                      rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
280             rrd_set_error("apply_smoother: seek to cdp_prep failed");
281             free(rrd_values);
282             return -1;
283         }
284         if (rrd_write(rrd_file, rrd->cdp_prep,
285                       sizeof(cdp_prep_t) *
286                       (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
287             != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
288                           (rrd->stat_head->ds_cnt))) {
289             rrd_set_error("apply_smoother: cdp_prep write failed");
290             free(rrd_values);
291             return -1;
292         }
293     }
295     /* endif CF_SEASONAL */
296     /* flush updated values to disk */
297     if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
298         rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
299         free(rrd_values);
300         return -1;
301     }
302     /* write as a single block */
303     if (rrd_write
304         (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
305         != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
306         rrd_set_error("apply_smoother: write failed to %lu", rra_start);
307         free(rrd_values);
308         return -1;
309     }
311     free(rrd_values);
312     free(baseline);
313     return 0;
316 /* Reset aberrant behavior model coefficients, including intercept, slope,
317  * seasonal, and seasonal deviation for the specified data source. */
318 void reset_aberrant_coefficients(
319     rrd_t *rrd,
320     rrd_file_t *rrd_file,
321     unsigned long ds_idx)
323     unsigned long cdp_idx, rra_idx, i;
324     unsigned long cdp_start, rra_start;
325     rrd_value_t nan_buffer = DNAN;
327     /* compute the offset for the cdp area */
328     cdp_start = sizeof(stat_head_t) +
329         rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
330         rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
331         sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
332     /* compute the offset for the first rra */
333     rra_start = cdp_start +
334         (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
335         sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
337     /* loop over the RRAs */
338     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
339         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
340         switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
341         case CF_HWPREDICT:
342         case CF_MHWPREDICT:
343             init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
344             break;
345         case CF_SEASONAL:
346         case CF_DEVSEASONAL:
347             /* don't use init_seasonal because it will reset burn-in, which
348              * means different data sources will be calling for the smoother
349              * at different times. */
350             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
351             rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
352             /* move to first entry of data source for this rra */
353             rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
354                      SEEK_SET);
355             /* entries for the same data source are not contiguous, 
356              * temporal entries are contiguous */
357             for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
358                 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
359                     != sizeof(rrd_value_t) * 1) {
360                     rrd_set_error
361                         ("reset_aberrant_coefficients: write failed data source %lu rra %s",
362                          ds_idx, rrd->rra_def[rra_idx].cf_nam);
363                     return;
364                 }
365                 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
366                          sizeof(rrd_value_t), SEEK_CUR);
367             }
368             break;
369         case CF_FAILURES:
370             erase_violations(rrd, cdp_idx, rra_idx);
371             break;
372         default:
373             break;
374         }
375         /* move offset to the next rra */
376         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
377             sizeof(rrd_value_t);
378     }
379     rrd_seek(rrd_file, cdp_start, SEEK_SET);
380     if (rrd_write(rrd_file, rrd->cdp_prep,
381                   sizeof(cdp_prep_t) *
382                   (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
383         != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
384                       (rrd->stat_head->ds_cnt))) {
385         rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
386     }
389 void init_hwpredict_cdp(
390     cdp_prep_t *cdp)
392     cdp->scratch[CDP_hw_intercept].u_val = DNAN;
393     cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
394     cdp->scratch[CDP_hw_slope].u_val = DNAN;
395     cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
396     cdp->scratch[CDP_null_count].u_cnt = 1;
397     cdp->scratch[CDP_last_null_count].u_cnt = 1;
400 void init_seasonal_cdp(
401     cdp_prep_t *cdp)
403     cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
404     cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
405     cdp->scratch[CDP_init_seasonal].u_cnt = 1;
408 int update_aberrant_CF(
409     rrd_t *rrd,
410     rrd_value_t pdp_val,
411     enum cf_en current_cf,
412     unsigned long cdp_idx,
413     unsigned long rra_idx,
414     unsigned long ds_idx,
415     unsigned short CDP_scratch_idx,
416     rrd_value_t *seasonal_coef)
418     static hw_functions_t hw_multiplicative_functions = {
419         hw_multiplicative_calculate_prediction,
420         hw_multiplicative_calculate_intercept,
421         hw_calculate_slope,
422         hw_multiplicative_calculate_seasonality,
423         hw_multiplicative_init_seasonality,
424         hw_calculate_seasonal_deviation,
425         hw_init_seasonal_deviation,
426         1.0             /* identity value */
427     };
429     static hw_functions_t hw_additive_functions = {
430         hw_additive_calculate_prediction,
431         hw_additive_calculate_intercept,
432         hw_calculate_slope,
433         hw_additive_calculate_seasonality,
434         hw_additive_init_seasonality,
435         hw_calculate_seasonal_deviation,
436         hw_init_seasonal_deviation,
437         0.0             /* identity value  */
438     };
440     rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
441     switch (current_cf) {
442     case CF_HWPREDICT:
443         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
444                                 CDP_scratch_idx, &hw_additive_functions);
445     case CF_MHWPREDICT:
446         return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
447                                 CDP_scratch_idx,
448                                 &hw_multiplicative_functions);
449     case CF_DEVPREDICT:
450         return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
451                                  CDP_scratch_idx);
452     case CF_SEASONAL:
453         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
454         case CF_HWPREDICT:
455             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
456                                    CDP_scratch_idx, seasonal_coef,
457                                    &hw_additive_functions);
458         case CF_MHWPREDICT:
459             return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
460                                    CDP_scratch_idx, seasonal_coef,
461                                    &hw_multiplicative_functions);
462         default:
463             return -1;
464         }
465     case CF_DEVSEASONAL:
466         switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
467         case CF_HWPREDICT:
468             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
469                                       CDP_scratch_idx, seasonal_coef,
470                                       &hw_additive_functions);
471         case CF_MHWPREDICT:
472             return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
473                                       CDP_scratch_idx, seasonal_coef,
474                                       &hw_multiplicative_functions);
475         default:
476             return -1;
477         }
478     case CF_FAILURES:
479         switch (cf_conv
480                 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
481                  cf_nam)) {
482         case CF_HWPREDICT:
483             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
484                                    CDP_scratch_idx, &hw_additive_functions);
485         case CF_MHWPREDICT:
486             return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
487                                    CDP_scratch_idx,
488                                    &hw_multiplicative_functions);
489         default:
490             return -1;
491         }
492     case CF_AVERAGE:
493     default:
494         return 0;
495     }
496     return -1;
499 static unsigned long MyMod(
500     signed long val,
501     unsigned long mod)
503     unsigned long new_val;
505     if (val < 0)
506         new_val = ((unsigned long) abs(val)) % mod;
507     else
508         new_val = (val % mod);
510     if (val < 0)
511         return (mod - new_val);
512     else
513         return (new_val);
516 /* a standard fixed-capacity FIF0 queue implementation
517  * No overflow checking is performed. */
518 int queue_alloc(
519     FIFOqueue **q,
520     int capacity)
522     *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
523     if (*q == NULL)
524         return -1;
525     (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
526     if ((*q)->queue == NULL) {
527         free(*q);
528         return -1;
529     }
530     (*q)->capacity = capacity;
531     (*q)->head = capacity;
532     (*q)->tail = 0;
533     return 0;
536 int queue_isempty(
537     FIFOqueue *q)
539     return (q->head % q->capacity == q->tail);
542 void queue_push(
543     FIFOqueue *q,
544     rrd_value_t value)
546     q->queue[(q->tail)++] = value;
547     q->tail = q->tail % q->capacity;
550 rrd_value_t queue_pop(
551     FIFOqueue *q)
553     q->head = q->head % q->capacity;
554     return q->queue[(q->head)++];
557 void queue_dealloc(
558     FIFOqueue *q)
560     free(q->queue);
561     free(q);