6f2c50732051c5c16dfbb8644904b968e8c40956
1 /*****************************************************************************
2 * RRDtool 1.3.2 Copyright by Tobi Oetiker, 1997-2008
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
9 #include <stdlib.h>
11 #include "rrd_tool.h"
12 #include "rrd_hw.h"
13 #include "rrd_hw_math.h"
14 #include "rrd_hw_update.h"
16 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
18 /* #define DEBUG */
20 /* private functions */
21 static unsigned long MyMod(
22 signed long val,
23 unsigned long mod);
25 int lookup_seasonal(
26 rrd_t *rrd,
27 unsigned long rra_idx,
28 unsigned long rra_start,
29 rrd_file_t *rrd_file,
30 unsigned long offset,
31 rrd_value_t **seasonal_coef)
32 {
33 unsigned long pos_tmp;
35 /* rra_ptr[].cur_row points to the rra row to be written; this function
36 * reads cur_row + offset */
37 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
39 /* handle wrap around */
40 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
41 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
43 /* rra_start points to the appropriate rra block in the file */
44 /* compute the pointer to the appropriate location in the file */
45 pos_tmp =
46 rra_start +
47 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
49 /* allocate memory if need be */
50 if (*seasonal_coef == NULL)
51 *seasonal_coef =
52 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
53 sizeof(rrd_value_t));
54 if (*seasonal_coef == NULL) {
55 rrd_set_error("memory allocation failure: seasonal coef");
56 return -1;
57 }
59 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
60 if (rrd_read
61 (rrd_file, *seasonal_coef,
62 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
63 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
64 /* success! */
65 /* we can safely ignore the rule requiring a seek operation between read
66 * and write, because this read moves the file pointer to somewhere
67 * in the file other than the next write location.
68 * */
69 return 0;
70 } else {
71 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
72 pos_tmp);
73 }
74 } else {
75 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
76 pos_tmp);
77 }
79 return -1;
80 }
82 /* For the specified CDP prep area and the FAILURES RRA,
83 * erase all history of past violations.
84 */
85 void erase_violations(
86 rrd_t *rrd,
87 unsigned long cdp_idx,
88 unsigned long rra_idx)
89 {
90 unsigned short i;
91 char *violations_array;
93 /* check that rra_idx is a CF_FAILURES array */
94 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
95 #ifdef DEBUG
96 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
97 rrd->rra_def[rra_idx].cf_nam);
98 #endif
99 return;
100 }
101 #ifdef DEBUG
102 fprintf(stderr, "scratch buffer before erase:\n");
103 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
104 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
105 }
106 fprintf(stderr, "\n");
107 #endif
109 /* WARNING: an array of longs on disk is treated as an array of chars
110 * in memory. */
111 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
112 /* erase everything in the part of the CDP scratch array that will be
113 * used to store violations for the current window */
114 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
115 violations_array[i - 1] = 0;
116 }
117 #ifdef DEBUG
118 fprintf(stderr, "scratch buffer after erase:\n");
119 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
120 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
121 }
122 fprintf(stderr, "\n");
123 #endif
124 }
126 /* Smooth a periodic array with a moving average: equal weights and
127 * length = 5% of the period. */
128 int apply_smoother(
129 rrd_t *rrd,
130 unsigned long rra_idx,
131 unsigned long rra_start,
132 rrd_file_t *rrd_file)
133 {
134 unsigned long i, j, k;
135 unsigned long totalbytes;
136 rrd_value_t *rrd_values;
137 unsigned long row_length = rrd->stat_head->ds_cnt;
138 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
139 unsigned long offset;
140 FIFOqueue **buffers;
141 rrd_value_t *working_average;
142 rrd_value_t *baseline;
144 if (atoi(rrd->stat_head->version) >= 4) {
145 offset = floor(rrd->rra_def[rra_idx].
146 par[RRA_seasonal_smoothing_window].
147 u_val / 2 * row_count);
148 } else {
149 offset = floor(0.05 / 2 * row_count);
150 }
152 if (offset == 0)
153 return 0; /* no smoothing */
155 /* allocate memory */
156 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
157 rrd_values = (rrd_value_t *) malloc(totalbytes);
158 if (rrd_values == NULL) {
159 rrd_set_error("apply smoother: memory allocation failure");
160 return -1;
161 }
163 /* rra_start is at the beginning of this rra */
164 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
165 rrd_set_error("seek to rra %d failed", rra_start);
166 free(rrd_values);
167 return -1;
168 }
169 rrd_flush(rrd_file);
170 /* could read all data in a single block, but we need to
171 * check for NA values */
172 for (i = 0; i < row_count; ++i) {
173 for (j = 0; j < row_length; ++j) {
174 if (rrd_read
175 (rrd_file, &(rrd_values[i * row_length + j]),
176 sizeof(rrd_value_t) * 1)
177 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
178 rrd_set_error("reading value failed: %s",
179 rrd_strerror(errno));
180 }
181 if (isnan(rrd_values[i * row_length + j])) {
182 /* can't apply smoothing, still uninitialized values */
183 #ifdef DEBUG
184 fprintf(stderr,
185 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
186 i, j);
187 #endif
188 free(rrd_values);
189 return 0;
190 }
191 }
192 }
194 /* allocate queues, one for each data source */
195 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
196 for (i = 0; i < row_length; ++i) {
197 queue_alloc(&(buffers[i]), 2 * offset + 1);
198 }
199 /* need working average initialized to 0 */
200 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
201 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203 /* compute sums of the first 2*offset terms */
204 for (i = 0; i < 2 * offset; ++i) {
205 k = MyMod(i - offset, row_count);
206 for (j = 0; j < row_length; ++j) {
207 queue_push(buffers[j], rrd_values[k * row_length + j]);
208 working_average[j] += rrd_values[k * row_length + j];
209 }
210 }
212 /* compute moving averages */
213 for (i = offset; i < row_count + offset; ++i) {
214 for (j = 0; j < row_length; ++j) {
215 k = MyMod(i, row_count);
216 /* add a term to the sum */
217 working_average[j] += rrd_values[k * row_length + j];
218 queue_push(buffers[j], rrd_values[k * row_length + j]);
220 /* reset k to be the center of the window */
221 k = MyMod(i - offset, row_count);
222 /* overwrite rdd_values entry, the old value is already
223 * saved in buffers */
224 rrd_values[k * row_length + j] =
225 working_average[j] / (2 * offset + 1);
226 baseline[j] += rrd_values[k * row_length + j];
228 /* remove a term from the sum */
229 working_average[j] -= queue_pop(buffers[j]);
230 }
231 }
233 for (i = 0; i < row_length; ++i) {
234 queue_dealloc(buffers[i]);
235 baseline[i] /= row_count;
236 }
237 free(buffers);
238 free(working_average);
240 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
241 rrd_value_t (
242 *init_seasonality) (
243 rrd_value_t seasonal_coef,
244 rrd_value_t intercept);
246 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
247 case CF_HWPREDICT:
248 init_seasonality = hw_additive_init_seasonality;
249 break;
250 case CF_MHWPREDICT:
251 init_seasonality = hw_multiplicative_init_seasonality;
252 break;
253 default:
254 rrd_set_error("apply smoother: SEASONAL rra doesn't have "
255 "valid dependency: %s",
256 rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
257 return -1;
258 }
260 for (j = 0; j < row_length; ++j) {
261 for (i = 0; i < row_count; ++i) {
262 rrd_values[i * row_length + j] =
263 init_seasonality(rrd_values[i * row_length + j],
264 baseline[j]);
265 }
266 /* update the baseline coefficient,
267 * first, compute the cdp_index. */
268 offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
269 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
270 baseline[j];
271 }
272 /* flush cdp to disk */
273 rrd_flush(rrd_file);
274 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
275 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
276 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
277 sizeof(live_head_t) +
278 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
279 rrd_set_error("apply_smoother: seek to cdp_prep failed");
280 free(rrd_values);
281 return -1;
282 }
283 if (rrd_write(rrd_file, rrd->cdp_prep,
284 sizeof(cdp_prep_t) *
285 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
286 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
287 (rrd->stat_head->ds_cnt))) {
288 rrd_set_error("apply_smoother: cdp_prep write failed");
289 free(rrd_values);
290 return -1;
291 }
292 }
294 /* endif CF_SEASONAL */
295 /* flush updated values to disk */
296 rrd_flush(rrd_file);
297 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
298 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
299 free(rrd_values);
300 return -1;
301 }
302 /* write as a single block */
303 if (rrd_write
304 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
305 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
306 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
307 free(rrd_values);
308 return -1;
309 }
311 rrd_flush(rrd_file);
312 free(rrd_values);
313 free(baseline);
314 return 0;
315 }
317 /* Reset aberrant behavior model coefficients, including intercept, slope,
318 * seasonal, and seasonal deviation for the specified data source. */
319 void reset_aberrant_coefficients(
320 rrd_t *rrd,
321 rrd_file_t *rrd_file,
322 unsigned long ds_idx)
323 {
324 unsigned long cdp_idx, rra_idx, i;
325 unsigned long cdp_start, rra_start;
326 rrd_value_t nan_buffer = DNAN;
328 /* compute the offset for the cdp area */
329 cdp_start = sizeof(stat_head_t) +
330 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
331 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
332 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
333 /* compute the offset for the first rra */
334 rra_start = cdp_start +
335 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
336 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
338 /* loop over the RRAs */
339 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
340 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
341 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
342 case CF_HWPREDICT:
343 case CF_MHWPREDICT:
344 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
345 break;
346 case CF_SEASONAL:
347 case CF_DEVSEASONAL:
348 /* don't use init_seasonal because it will reset burn-in, which
349 * means different data sources will be calling for the smoother
350 * at different times. */
351 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
352 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
353 /* move to first entry of data source for this rra */
354 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
355 SEEK_SET);
356 /* entries for the same data source are not contiguous,
357 * temporal entries are contiguous */
358 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
359 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
360 != sizeof(rrd_value_t) * 1) {
361 rrd_set_error
362 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
363 ds_idx, rrd->rra_def[rra_idx].cf_nam);
364 return;
365 }
366 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
367 sizeof(rrd_value_t), SEEK_CUR);
368 }
369 break;
370 case CF_FAILURES:
371 erase_violations(rrd, cdp_idx, rra_idx);
372 break;
373 default:
374 break;
375 }
376 /* move offset to the next rra */
377 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
378 sizeof(rrd_value_t);
379 }
380 rrd_seek(rrd_file, cdp_start, SEEK_SET);
381 if (rrd_write(rrd_file, rrd->cdp_prep,
382 sizeof(cdp_prep_t) *
383 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
384 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
385 (rrd->stat_head->ds_cnt))) {
386 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
387 }
388 }
390 void init_hwpredict_cdp(
391 cdp_prep_t *cdp)
392 {
393 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
394 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
395 cdp->scratch[CDP_hw_slope].u_val = DNAN;
396 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
397 cdp->scratch[CDP_null_count].u_cnt = 1;
398 cdp->scratch[CDP_last_null_count].u_cnt = 1;
399 }
401 void init_seasonal_cdp(
402 cdp_prep_t *cdp)
403 {
404 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
405 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
406 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
407 }
409 int update_aberrant_CF(
410 rrd_t *rrd,
411 rrd_value_t pdp_val,
412 enum cf_en current_cf,
413 unsigned long cdp_idx,
414 unsigned long rra_idx,
415 unsigned long ds_idx,
416 unsigned short CDP_scratch_idx,
417 rrd_value_t *seasonal_coef)
418 {
419 static hw_functions_t hw_multiplicative_functions = {
420 hw_multiplicative_calculate_prediction,
421 hw_multiplicative_calculate_intercept,
422 hw_calculate_slope,
423 hw_multiplicative_calculate_seasonality,
424 hw_multiplicative_init_seasonality,
425 hw_calculate_seasonal_deviation,
426 hw_init_seasonal_deviation,
427 1.0 /* identity value */
428 };
430 static hw_functions_t hw_additive_functions = {
431 hw_additive_calculate_prediction,
432 hw_additive_calculate_intercept,
433 hw_calculate_slope,
434 hw_additive_calculate_seasonality,
435 hw_additive_init_seasonality,
436 hw_calculate_seasonal_deviation,
437 hw_init_seasonal_deviation,
438 0.0 /* identity value */
439 };
441 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
442 switch (current_cf) {
443 case CF_HWPREDICT:
444 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
445 CDP_scratch_idx, &hw_additive_functions);
446 case CF_MHWPREDICT:
447 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
448 CDP_scratch_idx,
449 &hw_multiplicative_functions);
450 case CF_DEVPREDICT:
451 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
452 CDP_scratch_idx);
453 case CF_SEASONAL:
454 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
455 case CF_HWPREDICT:
456 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
457 CDP_scratch_idx, seasonal_coef,
458 &hw_additive_functions);
459 case CF_MHWPREDICT:
460 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
461 CDP_scratch_idx, seasonal_coef,
462 &hw_multiplicative_functions);
463 default:
464 return -1;
465 }
466 case CF_DEVSEASONAL:
467 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
468 case CF_HWPREDICT:
469 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
470 CDP_scratch_idx, seasonal_coef,
471 &hw_additive_functions);
472 case CF_MHWPREDICT:
473 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
474 CDP_scratch_idx, seasonal_coef,
475 &hw_multiplicative_functions);
476 default:
477 return -1;
478 }
479 case CF_FAILURES:
480 switch (cf_conv
481 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
482 cf_nam)) {
483 case CF_HWPREDICT:
484 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
485 CDP_scratch_idx, &hw_additive_functions);
486 case CF_MHWPREDICT:
487 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
488 CDP_scratch_idx,
489 &hw_multiplicative_functions);
490 default:
491 return -1;
492 }
493 case CF_AVERAGE:
494 default:
495 return 0;
496 }
497 return -1;
498 }
500 static unsigned long MyMod(
501 signed long val,
502 unsigned long mod)
503 {
504 unsigned long new_val;
506 if (val < 0)
507 new_val = ((unsigned long) abs(val)) % mod;
508 else
509 new_val = (val % mod);
511 if (val < 0)
512 return (mod - new_val);
513 else
514 return (new_val);
515 }
517 /* a standard fixed-capacity FIF0 queue implementation
518 * No overflow checking is performed. */
519 int queue_alloc(
520 FIFOqueue **q,
521 int capacity)
522 {
523 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
524 if (*q == NULL)
525 return -1;
526 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
527 if ((*q)->queue == NULL) {
528 free(*q);
529 return -1;
530 }
531 (*q)->capacity = capacity;
532 (*q)->head = capacity;
533 (*q)->tail = 0;
534 return 0;
535 }
537 int queue_isempty(
538 FIFOqueue *q)
539 {
540 return (q->head % q->capacity == q->tail);
541 }
543 void queue_push(
544 FIFOqueue *q,
545 rrd_value_t value)
546 {
547 q->queue[(q->tail)++] = value;
548 q->tail = q->tail % q->capacity;
549 }
551 rrd_value_t queue_pop(
552 FIFOqueue *q)
553 {
554 q->head = q->head % q->capacity;
555 return q->queue[(q->head)++];
556 }
558 void queue_dealloc(
559 FIFOqueue *q)
560 {
561 free(q->queue);
562 free(q);
563 }