e95fa082ae99243f15298eceaf1c599ecb64a55b
1 /*****************************************************************************
2 * RRDtool 1.3.5 Copyright by Tobi Oetiker, 1997-2008
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
14 #ifdef WIN32
15 #include <stdlib.h>
16 #endif
18 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
20 /* #define DEBUG */
22 /* private functions */
23 static unsigned long MyMod(
24 signed long val,
25 unsigned long mod);
27 int lookup_seasonal(
28 rrd_t *rrd,
29 unsigned long rra_idx,
30 unsigned long rra_start,
31 rrd_file_t *rrd_file,
32 unsigned long offset,
33 rrd_value_t **seasonal_coef)
34 {
35 unsigned long pos_tmp;
37 /* rra_ptr[].cur_row points to the rra row to be written; this function
38 * reads cur_row + offset */
39 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
41 /* handle wrap around */
42 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
43 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
45 /* rra_start points to the appropriate rra block in the file */
46 /* compute the pointer to the appropriate location in the file */
47 pos_tmp =
48 rra_start +
49 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
51 /* allocate memory if need be */
52 if (*seasonal_coef == NULL)
53 *seasonal_coef =
54 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
55 sizeof(rrd_value_t));
56 if (*seasonal_coef == NULL) {
57 rrd_set_error("memory allocation failure: seasonal coef");
58 return -1;
59 }
61 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
62 if (rrd_read
63 (rrd_file, *seasonal_coef,
64 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
65 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
66 /* success! */
67 /* we can safely ignore the rule requiring a seek operation between read
68 * and write, because this read moves the file pointer to somewhere
69 * in the file other than the next write location.
70 * */
71 return 0;
72 } else {
73 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
74 pos_tmp);
75 }
76 } else {
77 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
78 pos_tmp);
79 }
81 return -1;
82 }
84 /* For the specified CDP prep area and the FAILURES RRA,
85 * erase all history of past violations.
86 */
87 void erase_violations(
88 rrd_t *rrd,
89 unsigned long cdp_idx,
90 unsigned long rra_idx)
91 {
92 unsigned short i;
93 char *violations_array;
95 /* check that rra_idx is a CF_FAILURES array */
96 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
97 #ifdef DEBUG
98 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
99 rrd->rra_def[rra_idx].cf_nam);
100 #endif
101 return;
102 }
103 #ifdef DEBUG
104 fprintf(stderr, "scratch buffer before erase:\n");
105 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
106 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
107 }
108 fprintf(stderr, "\n");
109 #endif
111 /* WARNING: an array of longs on disk is treated as an array of chars
112 * in memory. */
113 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
114 /* erase everything in the part of the CDP scratch array that will be
115 * used to store violations for the current window */
116 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
117 violations_array[i - 1] = 0;
118 }
119 #ifdef DEBUG
120 fprintf(stderr, "scratch buffer after erase:\n");
121 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
122 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
123 }
124 fprintf(stderr, "\n");
125 #endif
126 }
128 /* Smooth a periodic array with a moving average: equal weights and
129 * length = 5% of the period. */
130 int apply_smoother(
131 rrd_t *rrd,
132 unsigned long rra_idx,
133 unsigned long rra_start,
134 rrd_file_t *rrd_file)
135 {
136 unsigned long i, j, k;
137 unsigned long totalbytes;
138 rrd_value_t *rrd_values;
139 unsigned long row_length = rrd->stat_head->ds_cnt;
140 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
141 unsigned long offset;
142 FIFOqueue **buffers;
143 rrd_value_t *working_average;
144 rrd_value_t *baseline;
146 if (atoi(rrd->stat_head->version) >= 4) {
147 offset = floor(rrd->rra_def[rra_idx].
148 par[RRA_seasonal_smoothing_window].
149 u_val / 2 * row_count);
150 } else {
151 offset = floor(0.05 / 2 * row_count);
152 }
154 if (offset == 0)
155 return 0; /* no smoothing */
157 /* allocate memory */
158 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
159 rrd_values = (rrd_value_t *) malloc(totalbytes);
160 if (rrd_values == NULL) {
161 rrd_set_error("apply smoother: memory allocation failure");
162 return -1;
163 }
165 /* rra_start is at the beginning of this rra */
166 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
167 rrd_set_error("seek to rra %d failed", rra_start);
168 free(rrd_values);
169 return -1;
170 }
171 rrd_flush(rrd_file);
172 /* could read all data in a single block, but we need to
173 * check for NA values */
174 for (i = 0; i < row_count; ++i) {
175 for (j = 0; j < row_length; ++j) {
176 if (rrd_read
177 (rrd_file, &(rrd_values[i * row_length + j]),
178 sizeof(rrd_value_t) * 1)
179 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
180 rrd_set_error("reading value failed: %s",
181 rrd_strerror(errno));
182 }
183 if (isnan(rrd_values[i * row_length + j])) {
184 /* can't apply smoothing, still uninitialized values */
185 #ifdef DEBUG
186 fprintf(stderr,
187 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
188 i, j);
189 #endif
190 free(rrd_values);
191 return 0;
192 }
193 }
194 }
196 /* allocate queues, one for each data source */
197 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
198 for (i = 0; i < row_length; ++i) {
199 queue_alloc(&(buffers[i]), 2 * offset + 1);
200 }
201 /* need working average initialized to 0 */
202 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
205 /* compute sums of the first 2*offset terms */
206 for (i = 0; i < 2 * offset; ++i) {
207 k = MyMod(i - offset, row_count);
208 for (j = 0; j < row_length; ++j) {
209 queue_push(buffers[j], rrd_values[k * row_length + j]);
210 working_average[j] += rrd_values[k * row_length + j];
211 }
212 }
214 /* compute moving averages */
215 for (i = offset; i < row_count + offset; ++i) {
216 for (j = 0; j < row_length; ++j) {
217 k = MyMod(i, row_count);
218 /* add a term to the sum */
219 working_average[j] += rrd_values[k * row_length + j];
220 queue_push(buffers[j], rrd_values[k * row_length + j]);
222 /* reset k to be the center of the window */
223 k = MyMod(i - offset, row_count);
224 /* overwrite rdd_values entry, the old value is already
225 * saved in buffers */
226 rrd_values[k * row_length + j] =
227 working_average[j] / (2 * offset + 1);
228 baseline[j] += rrd_values[k * row_length + j];
230 /* remove a term from the sum */
231 working_average[j] -= queue_pop(buffers[j]);
232 }
233 }
235 for (i = 0; i < row_length; ++i) {
236 queue_dealloc(buffers[i]);
237 baseline[i] /= row_count;
238 }
239 free(buffers);
240 free(working_average);
242 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
243 rrd_value_t (
244 *init_seasonality) (
245 rrd_value_t seasonal_coef,
246 rrd_value_t intercept);
248 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
249 case CF_HWPREDICT:
250 init_seasonality = hw_additive_init_seasonality;
251 break;
252 case CF_MHWPREDICT:
253 init_seasonality = hw_multiplicative_init_seasonality;
254 break;
255 default:
256 rrd_set_error("apply smoother: SEASONAL rra doesn't have "
257 "valid dependency: %s",
258 rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
259 return -1;
260 }
262 for (j = 0; j < row_length; ++j) {
263 for (i = 0; i < row_count; ++i) {
264 rrd_values[i * row_length + j] =
265 init_seasonality(rrd_values[i * row_length + j],
266 baseline[j]);
267 }
268 /* update the baseline coefficient,
269 * first, compute the cdp_index. */
270 offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
271 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
272 baseline[j];
273 }
274 /* flush cdp to disk */
275 rrd_flush(rrd_file);
276 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
277 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
278 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
279 sizeof(live_head_t) +
280 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
281 rrd_set_error("apply_smoother: seek to cdp_prep failed");
282 free(rrd_values);
283 return -1;
284 }
285 if (rrd_write(rrd_file, rrd->cdp_prep,
286 sizeof(cdp_prep_t) *
287 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
288 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
289 (rrd->stat_head->ds_cnt))) {
290 rrd_set_error("apply_smoother: cdp_prep write failed");
291 free(rrd_values);
292 return -1;
293 }
294 }
296 /* endif CF_SEASONAL */
297 /* flush updated values to disk */
298 rrd_flush(rrd_file);
299 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
300 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
301 free(rrd_values);
302 return -1;
303 }
304 /* write as a single block */
305 if (rrd_write
306 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
307 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
308 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
309 free(rrd_values);
310 return -1;
311 }
313 rrd_flush(rrd_file);
314 free(rrd_values);
315 free(baseline);
316 return 0;
317 }
319 /* Reset aberrant behavior model coefficients, including intercept, slope,
320 * seasonal, and seasonal deviation for the specified data source. */
321 void reset_aberrant_coefficients(
322 rrd_t *rrd,
323 rrd_file_t *rrd_file,
324 unsigned long ds_idx)
325 {
326 unsigned long cdp_idx, rra_idx, i;
327 unsigned long cdp_start, rra_start;
328 rrd_value_t nan_buffer = DNAN;
330 /* compute the offset for the cdp area */
331 cdp_start = sizeof(stat_head_t) +
332 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
333 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
334 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
335 /* compute the offset for the first rra */
336 rra_start = cdp_start +
337 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
338 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
340 /* loop over the RRAs */
341 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
342 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
343 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
344 case CF_HWPREDICT:
345 case CF_MHWPREDICT:
346 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
347 break;
348 case CF_SEASONAL:
349 case CF_DEVSEASONAL:
350 /* don't use init_seasonal because it will reset burn-in, which
351 * means different data sources will be calling for the smoother
352 * at different times. */
353 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
354 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
355 /* move to first entry of data source for this rra */
356 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
357 SEEK_SET);
358 /* entries for the same data source are not contiguous,
359 * temporal entries are contiguous */
360 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
361 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
362 != sizeof(rrd_value_t) * 1) {
363 rrd_set_error
364 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
365 ds_idx, rrd->rra_def[rra_idx].cf_nam);
366 return;
367 }
368 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
369 sizeof(rrd_value_t), SEEK_CUR);
370 }
371 break;
372 case CF_FAILURES:
373 erase_violations(rrd, cdp_idx, rra_idx);
374 break;
375 default:
376 break;
377 }
378 /* move offset to the next rra */
379 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
380 sizeof(rrd_value_t);
381 }
382 rrd_seek(rrd_file, cdp_start, SEEK_SET);
383 if (rrd_write(rrd_file, rrd->cdp_prep,
384 sizeof(cdp_prep_t) *
385 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
386 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
387 (rrd->stat_head->ds_cnt))) {
388 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
389 }
390 }
392 void init_hwpredict_cdp(
393 cdp_prep_t *cdp)
394 {
395 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
396 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
397 cdp->scratch[CDP_hw_slope].u_val = DNAN;
398 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
399 cdp->scratch[CDP_null_count].u_cnt = 1;
400 cdp->scratch[CDP_last_null_count].u_cnt = 1;
401 }
403 void init_seasonal_cdp(
404 cdp_prep_t *cdp)
405 {
406 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
407 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
408 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
409 }
411 int update_aberrant_CF(
412 rrd_t *rrd,
413 rrd_value_t pdp_val,
414 enum cf_en current_cf,
415 unsigned long cdp_idx,
416 unsigned long rra_idx,
417 unsigned long ds_idx,
418 unsigned short CDP_scratch_idx,
419 rrd_value_t *seasonal_coef)
420 {
421 static hw_functions_t hw_multiplicative_functions = {
422 hw_multiplicative_calculate_prediction,
423 hw_multiplicative_calculate_intercept,
424 hw_calculate_slope,
425 hw_multiplicative_calculate_seasonality,
426 hw_multiplicative_init_seasonality,
427 hw_calculate_seasonal_deviation,
428 hw_init_seasonal_deviation,
429 1.0 /* identity value */
430 };
432 static hw_functions_t hw_additive_functions = {
433 hw_additive_calculate_prediction,
434 hw_additive_calculate_intercept,
435 hw_calculate_slope,
436 hw_additive_calculate_seasonality,
437 hw_additive_init_seasonality,
438 hw_calculate_seasonal_deviation,
439 hw_init_seasonal_deviation,
440 0.0 /* identity value */
441 };
443 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
444 switch (current_cf) {
445 case CF_HWPREDICT:
446 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
447 CDP_scratch_idx, &hw_additive_functions);
448 case CF_MHWPREDICT:
449 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
450 CDP_scratch_idx,
451 &hw_multiplicative_functions);
452 case CF_DEVPREDICT:
453 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
454 CDP_scratch_idx);
455 case CF_SEASONAL:
456 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
457 case CF_HWPREDICT:
458 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
459 CDP_scratch_idx, seasonal_coef,
460 &hw_additive_functions);
461 case CF_MHWPREDICT:
462 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
463 CDP_scratch_idx, seasonal_coef,
464 &hw_multiplicative_functions);
465 default:
466 return -1;
467 }
468 case CF_DEVSEASONAL:
469 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
470 case CF_HWPREDICT:
471 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
472 CDP_scratch_idx, seasonal_coef,
473 &hw_additive_functions);
474 case CF_MHWPREDICT:
475 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
476 CDP_scratch_idx, seasonal_coef,
477 &hw_multiplicative_functions);
478 default:
479 return -1;
480 }
481 case CF_FAILURES:
482 switch (cf_conv
483 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
484 cf_nam)) {
485 case CF_HWPREDICT:
486 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
487 CDP_scratch_idx, &hw_additive_functions);
488 case CF_MHWPREDICT:
489 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
490 CDP_scratch_idx,
491 &hw_multiplicative_functions);
492 default:
493 return -1;
494 }
495 case CF_AVERAGE:
496 default:
497 return 0;
498 }
499 return -1;
500 }
502 static unsigned long MyMod(
503 signed long val,
504 unsigned long mod)
505 {
506 unsigned long new_val;
508 if (val < 0)
509 new_val = ((unsigned long) abs(val)) % mod;
510 else
511 new_val = (val % mod);
513 if (val < 0)
514 return (mod - new_val);
515 else
516 return (new_val);
517 }
519 /* a standard fixed-capacity FIF0 queue implementation
520 * No overflow checking is performed. */
521 int queue_alloc(
522 FIFOqueue **q,
523 int capacity)
524 {
525 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
526 if (*q == NULL)
527 return -1;
528 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
529 if ((*q)->queue == NULL) {
530 free(*q);
531 return -1;
532 }
533 (*q)->capacity = capacity;
534 (*q)->head = capacity;
535 (*q)->tail = 0;
536 return 0;
537 }
539 int queue_isempty(
540 FIFOqueue *q)
541 {
542 return (q->head % q->capacity == q->tail);
543 }
545 void queue_push(
546 FIFOqueue *q,
547 rrd_value_t value)
548 {
549 q->queue[(q->tail)++] = value;
550 q->tail = q->tail % q->capacity;
551 }
553 rrd_value_t queue_pop(
554 FIFOqueue *q)
555 {
556 q->head = q->head % q->capacity;
557 return q->queue[(q->head)++];
558 }
560 void queue_dealloc(
561 FIFOqueue *q)
562 {
563 free(q->queue);
564 free(q);
565 }