1 /*****************************************************************************
2 * RRDtool 1.4.1 Copyright by Tobi Oetiker, 1997-2009
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
9 #include <stdlib.h>
11 #include "rrd_tool.h"
12 #include "rrd_hw.h"
13 #include "rrd_hw_math.h"
14 #include "rrd_hw_update.h"
16 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
18 /* #define DEBUG */
20 /* private functions */
21 static unsigned long MyMod(
22 signed long val,
23 unsigned long mod);
25 int lookup_seasonal(
26 rrd_t *rrd,
27 unsigned long rra_idx,
28 unsigned long rra_start,
29 rrd_file_t *rrd_file,
30 unsigned long offset,
31 rrd_value_t **seasonal_coef)
32 {
33 unsigned long pos_tmp;
35 /* rra_ptr[].cur_row points to the rra row to be written; this function
36 * reads cur_row + offset */
37 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
39 /* handle wrap around */
40 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
41 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
43 /* rra_start points to the appropriate rra block in the file */
44 /* compute the pointer to the appropriate location in the file */
45 pos_tmp =
46 rra_start +
47 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
49 /* allocate memory if need be */
50 if (*seasonal_coef == NULL)
51 *seasonal_coef =
52 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
53 sizeof(rrd_value_t));
54 if (*seasonal_coef == NULL) {
55 rrd_set_error("memory allocation failure: seasonal coef");
56 return -1;
57 }
59 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
60 if (rrd_read
61 (rrd_file, *seasonal_coef,
62 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
63 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
64 /* success! */
65 /* we can safely ignore the rule requiring a seek operation between read
66 * and write, because this read moves the file pointer to somewhere
67 * in the file other than the next write location.
68 * */
69 return 0;
70 } else {
71 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
72 pos_tmp);
73 }
74 } else {
75 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
76 pos_tmp);
77 }
79 return -1;
80 }
82 /* For the specified CDP prep area and the FAILURES RRA,
83 * erase all history of past violations.
84 */
85 void erase_violations(
86 rrd_t *rrd,
87 unsigned long cdp_idx,
88 unsigned long rra_idx)
89 {
90 unsigned short i;
91 char *violations_array;
93 /* check that rra_idx is a CF_FAILURES array */
94 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
95 #ifdef DEBUG
96 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
97 rrd->rra_def[rra_idx].cf_nam);
98 #endif
99 return;
100 }
101 #ifdef DEBUG
102 fprintf(stderr, "scratch buffer before erase:\n");
103 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
104 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
105 }
106 fprintf(stderr, "\n");
107 #endif
109 /* WARNING: an array of longs on disk is treated as an array of chars
110 * in memory. */
111 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
112 /* erase everything in the part of the CDP scratch array that will be
113 * used to store violations for the current window */
114 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
115 violations_array[i - 1] = 0;
116 }
117 #ifdef DEBUG
118 fprintf(stderr, "scratch buffer after erase:\n");
119 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
120 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
121 }
122 fprintf(stderr, "\n");
123 #endif
124 }
126 /* Smooth a periodic array with a moving average: equal weights and
127 * length = 5% of the period. */
128 int apply_smoother(
129 rrd_t *rrd,
130 unsigned long rra_idx,
131 unsigned long rra_start,
132 rrd_file_t *rrd_file)
133 {
134 unsigned long i, j, k;
135 unsigned long totalbytes;
136 rrd_value_t *rrd_values;
137 unsigned long row_length = rrd->stat_head->ds_cnt;
138 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
139 unsigned long offset;
140 FIFOqueue **buffers;
141 rrd_value_t *working_average;
142 rrd_value_t *baseline;
144 if (atoi(rrd->stat_head->version) >= 4) {
145 offset = floor(rrd->rra_def[rra_idx].
146 par[RRA_seasonal_smoothing_window].
147 u_val / 2 * row_count);
148 } else {
149 offset = floor(0.05 / 2 * row_count);
150 }
152 if (offset == 0)
153 return 0; /* no smoothing */
155 /* allocate memory */
156 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
157 rrd_values = (rrd_value_t *) malloc(totalbytes);
158 if (rrd_values == NULL) {
159 rrd_set_error("apply smoother: memory allocation failure");
160 return -1;
161 }
163 /* rra_start is at the beginning of this rra */
164 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
165 rrd_set_error("seek to rra %d failed", rra_start);
166 free(rrd_values);
167 return -1;
168 }
170 /* could read all data in a single block, but we need to
171 * check for NA values */
172 for (i = 0; i < row_count; ++i) {
173 for (j = 0; j < row_length; ++j) {
174 if (rrd_read
175 (rrd_file, &(rrd_values[i * row_length + j]),
176 sizeof(rrd_value_t) * 1)
177 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
178 rrd_set_error("reading value failed: %s",
179 rrd_strerror(errno));
180 }
181 if (isnan(rrd_values[i * row_length + j])) {
182 /* can't apply smoothing, still uninitialized values */
183 #ifdef DEBUG
184 fprintf(stderr,
185 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
186 i, j);
187 #endif
188 free(rrd_values);
189 return 0;
190 }
191 }
192 }
194 /* allocate queues, one for each data source */
195 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
196 for (i = 0; i < row_length; ++i) {
197 queue_alloc(&(buffers[i]), 2 * offset + 1);
198 }
199 /* need working average initialized to 0 */
200 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
201 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
203 /* compute sums of the first 2*offset terms */
204 for (i = 0; i < 2 * offset; ++i) {
205 k = MyMod(i - offset, row_count);
206 for (j = 0; j < row_length; ++j) {
207 queue_push(buffers[j], rrd_values[k * row_length + j]);
208 working_average[j] += rrd_values[k * row_length + j];
209 }
210 }
212 /* compute moving averages */
213 for (i = offset; i < row_count + offset; ++i) {
214 for (j = 0; j < row_length; ++j) {
215 k = MyMod(i, row_count);
216 /* add a term to the sum */
217 working_average[j] += rrd_values[k * row_length + j];
218 queue_push(buffers[j], rrd_values[k * row_length + j]);
220 /* reset k to be the center of the window */
221 k = MyMod(i - offset, row_count);
222 /* overwrite rdd_values entry, the old value is already
223 * saved in buffers */
224 rrd_values[k * row_length + j] =
225 working_average[j] / (2 * offset + 1);
226 baseline[j] += rrd_values[k * row_length + j];
228 /* remove a term from the sum */
229 working_average[j] -= queue_pop(buffers[j]);
230 }
231 }
233 for (i = 0; i < row_length; ++i) {
234 queue_dealloc(buffers[i]);
235 baseline[i] /= row_count;
236 }
237 free(buffers);
238 free(working_average);
240 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
241 rrd_value_t (
242 *init_seasonality) (
243 rrd_value_t seasonal_coef,
244 rrd_value_t intercept);
246 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
247 case CF_HWPREDICT:
248 init_seasonality = hw_additive_init_seasonality;
249 break;
250 case CF_MHWPREDICT:
251 init_seasonality = hw_multiplicative_init_seasonality;
252 break;
253 default:
254 rrd_set_error("apply smoother: SEASONAL rra doesn't have "
255 "valid dependency: %s",
256 rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
257 return -1;
258 }
260 for (j = 0; j < row_length; ++j) {
261 for (i = 0; i < row_count; ++i) {
262 rrd_values[i * row_length + j] =
263 init_seasonality(rrd_values[i * row_length + j],
264 baseline[j]);
265 }
266 /* update the baseline coefficient,
267 * first, compute the cdp_index. */
268 offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
269 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
270 baseline[j];
271 }
272 /* flush cdp to disk */
273 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
274 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
275 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
276 sizeof(live_head_t) +
277 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
278 rrd_set_error("apply_smoother: seek to cdp_prep failed");
279 free(rrd_values);
280 return -1;
281 }
282 if (rrd_write(rrd_file, rrd->cdp_prep,
283 sizeof(cdp_prep_t) *
284 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
285 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
286 (rrd->stat_head->ds_cnt))) {
287 rrd_set_error("apply_smoother: cdp_prep write failed");
288 free(rrd_values);
289 return -1;
290 }
291 }
293 /* endif CF_SEASONAL */
294 /* flush updated values to disk */
295 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
296 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
297 free(rrd_values);
298 return -1;
299 }
300 /* write as a single block */
301 if (rrd_write
302 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
303 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
304 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
305 free(rrd_values);
306 return -1;
307 }
309 free(rrd_values);
310 free(baseline);
311 return 0;
312 }
314 /* Reset aberrant behavior model coefficients, including intercept, slope,
315 * seasonal, and seasonal deviation for the specified data source. */
316 void reset_aberrant_coefficients(
317 rrd_t *rrd,
318 rrd_file_t *rrd_file,
319 unsigned long ds_idx)
320 {
321 unsigned long cdp_idx, rra_idx, i;
322 unsigned long cdp_start, rra_start;
323 rrd_value_t nan_buffer = DNAN;
325 /* compute the offset for the cdp area */
326 cdp_start = sizeof(stat_head_t) +
327 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
328 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
329 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
330 /* compute the offset for the first rra */
331 rra_start = cdp_start +
332 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
333 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
335 /* loop over the RRAs */
336 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
337 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
338 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
339 case CF_HWPREDICT:
340 case CF_MHWPREDICT:
341 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
342 break;
343 case CF_SEASONAL:
344 case CF_DEVSEASONAL:
345 /* don't use init_seasonal because it will reset burn-in, which
346 * means different data sources will be calling for the smoother
347 * at different times. */
348 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
349 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
350 /* move to first entry of data source for this rra */
351 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
352 SEEK_SET);
353 /* entries for the same data source are not contiguous,
354 * temporal entries are contiguous */
355 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
356 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
357 != sizeof(rrd_value_t) * 1) {
358 rrd_set_error
359 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
360 ds_idx, rrd->rra_def[rra_idx].cf_nam);
361 return;
362 }
363 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
364 sizeof(rrd_value_t), SEEK_CUR);
365 }
366 break;
367 case CF_FAILURES:
368 erase_violations(rrd, cdp_idx, rra_idx);
369 break;
370 default:
371 break;
372 }
373 /* move offset to the next rra */
374 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
375 sizeof(rrd_value_t);
376 }
377 rrd_seek(rrd_file, cdp_start, SEEK_SET);
378 if (rrd_write(rrd_file, rrd->cdp_prep,
379 sizeof(cdp_prep_t) *
380 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
381 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
382 (rrd->stat_head->ds_cnt))) {
383 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
384 }
385 }
387 void init_hwpredict_cdp(
388 cdp_prep_t *cdp)
389 {
390 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
391 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
392 cdp->scratch[CDP_hw_slope].u_val = DNAN;
393 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
394 cdp->scratch[CDP_null_count].u_cnt = 1;
395 cdp->scratch[CDP_last_null_count].u_cnt = 1;
396 }
398 void init_seasonal_cdp(
399 cdp_prep_t *cdp)
400 {
401 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
402 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
403 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
404 }
406 int update_aberrant_CF(
407 rrd_t *rrd,
408 rrd_value_t pdp_val,
409 enum cf_en current_cf,
410 unsigned long cdp_idx,
411 unsigned long rra_idx,
412 unsigned long ds_idx,
413 unsigned short CDP_scratch_idx,
414 rrd_value_t *seasonal_coef)
415 {
416 static hw_functions_t hw_multiplicative_functions = {
417 hw_multiplicative_calculate_prediction,
418 hw_multiplicative_calculate_intercept,
419 hw_calculate_slope,
420 hw_multiplicative_calculate_seasonality,
421 hw_multiplicative_init_seasonality,
422 hw_calculate_seasonal_deviation,
423 hw_init_seasonal_deviation,
424 1.0 /* identity value */
425 };
427 static hw_functions_t hw_additive_functions = {
428 hw_additive_calculate_prediction,
429 hw_additive_calculate_intercept,
430 hw_calculate_slope,
431 hw_additive_calculate_seasonality,
432 hw_additive_init_seasonality,
433 hw_calculate_seasonal_deviation,
434 hw_init_seasonal_deviation,
435 0.0 /* identity value */
436 };
438 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
439 switch (current_cf) {
440 case CF_HWPREDICT:
441 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
442 CDP_scratch_idx, &hw_additive_functions);
443 case CF_MHWPREDICT:
444 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
445 CDP_scratch_idx,
446 &hw_multiplicative_functions);
447 case CF_DEVPREDICT:
448 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
449 CDP_scratch_idx);
450 case CF_SEASONAL:
451 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
452 case CF_HWPREDICT:
453 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
454 CDP_scratch_idx, seasonal_coef,
455 &hw_additive_functions);
456 case CF_MHWPREDICT:
457 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
458 CDP_scratch_idx, seasonal_coef,
459 &hw_multiplicative_functions);
460 default:
461 return -1;
462 }
463 case CF_DEVSEASONAL:
464 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
465 case CF_HWPREDICT:
466 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
467 CDP_scratch_idx, seasonal_coef,
468 &hw_additive_functions);
469 case CF_MHWPREDICT:
470 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
471 CDP_scratch_idx, seasonal_coef,
472 &hw_multiplicative_functions);
473 default:
474 return -1;
475 }
476 case CF_FAILURES:
477 switch (cf_conv
478 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
479 cf_nam)) {
480 case CF_HWPREDICT:
481 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
482 CDP_scratch_idx, &hw_additive_functions);
483 case CF_MHWPREDICT:
484 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
485 CDP_scratch_idx,
486 &hw_multiplicative_functions);
487 default:
488 return -1;
489 }
490 case CF_AVERAGE:
491 default:
492 return 0;
493 }
494 return -1;
495 }
497 static unsigned long MyMod(
498 signed long val,
499 unsigned long mod)
500 {
501 unsigned long new_val;
503 if (val < 0)
504 new_val = ((unsigned long) abs(val)) % mod;
505 else
506 new_val = (val % mod);
508 if (val < 0)
509 return (mod - new_val);
510 else
511 return (new_val);
512 }
514 /* a standard fixed-capacity FIF0 queue implementation
515 * No overflow checking is performed. */
516 int queue_alloc(
517 FIFOqueue **q,
518 int capacity)
519 {
520 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
521 if (*q == NULL)
522 return -1;
523 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
524 if ((*q)->queue == NULL) {
525 free(*q);
526 return -1;
527 }
528 (*q)->capacity = capacity;
529 (*q)->head = capacity;
530 (*q)->tail = 0;
531 return 0;
532 }
534 int queue_isempty(
535 FIFOqueue *q)
536 {
537 return (q->head % q->capacity == q->tail);
538 }
540 void queue_push(
541 FIFOqueue *q,
542 rrd_value_t value)
543 {
544 q->queue[(q->tail)++] = value;
545 q->tail = q->tail % q->capacity;
546 }
548 rrd_value_t queue_pop(
549 FIFOqueue *q)
550 {
551 q->head = q->head % q->capacity;
552 return q->queue[(q->head)++];
553 }
555 void queue_dealloc(
556 FIFOqueue *q)
557 {
558 free(q->queue);
559 free(q);
560 }