1 /*****************************************************************************
2 * RRDtool 1.3rc7 Copyright by Tobi Oetiker, 1997-2008
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 #include "rrd_hw_math.h"
12 #include "rrd_hw_update.h"
14 #define hw_dep_idx(rrd, rra_idx) rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt
16 /* #define DEBUG */
18 /* private functions */
19 static unsigned long MyMod(
20 signed long val,
21 unsigned long mod);
23 int lookup_seasonal(
24 rrd_t *rrd,
25 unsigned long rra_idx,
26 unsigned long rra_start,
27 rrd_file_t *rrd_file,
28 unsigned long offset,
29 rrd_value_t **seasonal_coef)
30 {
31 unsigned long pos_tmp;
33 /* rra_ptr[].cur_row points to the rra row to be written; this function
34 * reads cur_row + offset */
35 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
37 /* handle wrap around */
38 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
39 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
41 /* rra_start points to the appropriate rra block in the file */
42 /* compute the pointer to the appropriate location in the file */
43 pos_tmp =
44 rra_start +
45 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
47 /* allocate memory if need be */
48 if (*seasonal_coef == NULL)
49 *seasonal_coef =
50 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
51 sizeof(rrd_value_t));
52 if (*seasonal_coef == NULL) {
53 rrd_set_error("memory allocation failure: seasonal coef");
54 return -1;
55 }
57 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
58 if (rrd_read
59 (rrd_file, *seasonal_coef,
60 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
61 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
62 /* success! */
63 /* we can safely ignore the rule requiring a seek operation between read
64 * and write, because this read moves the file pointer to somewhere
65 * in the file other than the next write location.
66 * */
67 return 0;
68 } else {
69 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
70 pos_tmp);
71 }
72 } else {
73 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
74 pos_tmp);
75 }
77 return -1;
78 }
80 /* For the specified CDP prep area and the FAILURES RRA,
81 * erase all history of past violations.
82 */
83 void erase_violations(
84 rrd_t *rrd,
85 unsigned long cdp_idx,
86 unsigned long rra_idx)
87 {
88 unsigned short i;
89 char *violations_array;
91 /* check that rra_idx is a CF_FAILURES array */
92 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
93 #ifdef DEBUG
94 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
95 rrd->rra_def[rra_idx].cf_nam);
96 #endif
97 return;
98 }
99 #ifdef DEBUG
100 fprintf(stderr, "scratch buffer before erase:\n");
101 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
102 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
103 }
104 fprintf(stderr, "\n");
105 #endif
107 /* WARNING: an array of longs on disk is treated as an array of chars
108 * in memory. */
109 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
110 /* erase everything in the part of the CDP scratch array that will be
111 * used to store violations for the current window */
112 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
113 violations_array[i - 1] = 0;
114 }
115 #ifdef DEBUG
116 fprintf(stderr, "scratch buffer after erase:\n");
117 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
118 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
119 }
120 fprintf(stderr, "\n");
121 #endif
122 }
124 /* Smooth a periodic array with a moving average: equal weights and
125 * length = 5% of the period. */
126 int apply_smoother(
127 rrd_t *rrd,
128 unsigned long rra_idx,
129 unsigned long rra_start,
130 rrd_file_t *rrd_file)
131 {
132 unsigned long i, j, k;
133 unsigned long totalbytes;
134 rrd_value_t *rrd_values;
135 unsigned long row_length = rrd->stat_head->ds_cnt;
136 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
137 unsigned long offset;
138 FIFOqueue **buffers;
139 rrd_value_t *working_average;
140 rrd_value_t *baseline;
142 if (atoi(rrd->stat_head->version) >= 4) {
143 offset = floor(rrd->rra_def[rra_idx].
144 par[RRA_seasonal_smoothing_window].
145 u_val / 2 * row_count);
146 } else {
147 offset = floor(0.05 / 2 * row_count);
148 }
150 if (offset == 0)
151 return 0; /* no smoothing */
153 /* allocate memory */
154 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
155 rrd_values = (rrd_value_t *) malloc(totalbytes);
156 if (rrd_values == NULL) {
157 rrd_set_error("apply smoother: memory allocation failure");
158 return -1;
159 }
161 /* rra_start is at the beginning of this rra */
162 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
163 rrd_set_error("seek to rra %d failed", rra_start);
164 free(rrd_values);
165 return -1;
166 }
167 rrd_flush(rrd_file);
168 /* could read all data in a single block, but we need to
169 * check for NA values */
170 for (i = 0; i < row_count; ++i) {
171 for (j = 0; j < row_length; ++j) {
172 if (rrd_read
173 (rrd_file, &(rrd_values[i * row_length + j]),
174 sizeof(rrd_value_t) * 1)
175 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
176 rrd_set_error("reading value failed: %s",
177 rrd_strerror(errno));
178 }
179 if (isnan(rrd_values[i * row_length + j])) {
180 /* can't apply smoothing, still uninitialized values */
181 #ifdef DEBUG
182 fprintf(stderr,
183 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
184 i, j);
185 #endif
186 free(rrd_values);
187 return 0;
188 }
189 }
190 }
192 /* allocate queues, one for each data source */
193 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
194 for (i = 0; i < row_length; ++i) {
195 queue_alloc(&(buffers[i]), 2 * offset + 1);
196 }
197 /* need working average initialized to 0 */
198 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
199 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
201 /* compute sums of the first 2*offset terms */
202 for (i = 0; i < 2 * offset; ++i) {
203 k = MyMod(i - offset, row_count);
204 for (j = 0; j < row_length; ++j) {
205 queue_push(buffers[j], rrd_values[k * row_length + j]);
206 working_average[j] += rrd_values[k * row_length + j];
207 }
208 }
210 /* compute moving averages */
211 for (i = offset; i < row_count + offset; ++i) {
212 for (j = 0; j < row_length; ++j) {
213 k = MyMod(i, row_count);
214 /* add a term to the sum */
215 working_average[j] += rrd_values[k * row_length + j];
216 queue_push(buffers[j], rrd_values[k * row_length + j]);
218 /* reset k to be the center of the window */
219 k = MyMod(i - offset, row_count);
220 /* overwrite rdd_values entry, the old value is already
221 * saved in buffers */
222 rrd_values[k * row_length + j] =
223 working_average[j] / (2 * offset + 1);
224 baseline[j] += rrd_values[k * row_length + j];
226 /* remove a term from the sum */
227 working_average[j] -= queue_pop(buffers[j]);
228 }
229 }
231 for (i = 0; i < row_length; ++i) {
232 queue_dealloc(buffers[i]);
233 baseline[i] /= row_count;
234 }
235 free(buffers);
236 free(working_average);
238 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
239 rrd_value_t (
240 *init_seasonality) (
241 rrd_value_t seasonal_coef,
242 rrd_value_t intercept);
244 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
245 case CF_HWPREDICT:
246 init_seasonality = hw_additive_init_seasonality;
247 break;
248 case CF_MHWPREDICT:
249 init_seasonality = hw_multiplicative_init_seasonality;
250 break;
251 default:
252 rrd_set_error("apply smoother: SEASONAL rra doesn't have "
253 "valid dependency: %s",
254 rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam);
255 return -1;
256 }
258 for (j = 0; j < row_length; ++j) {
259 for (i = 0; i < row_count; ++i) {
260 rrd_values[i * row_length + j] =
261 init_seasonality(rrd_values[i * row_length + j],
262 baseline[j]);
263 }
264 /* update the baseline coefficient,
265 * first, compute the cdp_index. */
266 offset = hw_dep_idx(rrd, rra_idx) * row_length + j;
267 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
268 baseline[j];
269 }
270 /* flush cdp to disk */
271 rrd_flush(rrd_file);
272 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
273 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
274 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
275 sizeof(live_head_t) +
276 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
277 rrd_set_error("apply_smoother: seek to cdp_prep failed");
278 free(rrd_values);
279 return -1;
280 }
281 if (rrd_write(rrd_file, rrd->cdp_prep,
282 sizeof(cdp_prep_t) *
283 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
284 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
285 (rrd->stat_head->ds_cnt))) {
286 rrd_set_error("apply_smoother: cdp_prep write failed");
287 free(rrd_values);
288 return -1;
289 }
290 }
292 /* endif CF_SEASONAL */
293 /* flush updated values to disk */
294 rrd_flush(rrd_file);
295 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
296 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
297 free(rrd_values);
298 return -1;
299 }
300 /* write as a single block */
301 if (rrd_write
302 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
303 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
304 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
305 free(rrd_values);
306 return -1;
307 }
309 rrd_flush(rrd_file);
310 free(rrd_values);
311 free(baseline);
312 return 0;
313 }
315 /* Reset aberrant behavior model coefficients, including intercept, slope,
316 * seasonal, and seasonal deviation for the specified data source. */
317 void reset_aberrant_coefficients(
318 rrd_t *rrd,
319 rrd_file_t *rrd_file,
320 unsigned long ds_idx)
321 {
322 unsigned long cdp_idx, rra_idx, i;
323 unsigned long cdp_start, rra_start;
324 rrd_value_t nan_buffer = DNAN;
326 /* compute the offset for the cdp area */
327 cdp_start = sizeof(stat_head_t) +
328 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
329 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
330 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
331 /* compute the offset for the first rra */
332 rra_start = cdp_start +
333 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
334 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
336 /* loop over the RRAs */
337 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
338 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
339 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
340 case CF_HWPREDICT:
341 case CF_MHWPREDICT:
342 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
343 break;
344 case CF_SEASONAL:
345 case CF_DEVSEASONAL:
346 /* don't use init_seasonal because it will reset burn-in, which
347 * means different data sources will be calling for the smoother
348 * at different times. */
349 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
350 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
351 /* move to first entry of data source for this rra */
352 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
353 SEEK_SET);
354 /* entries for the same data source are not contiguous,
355 * temporal entries are contiguous */
356 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
357 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
358 != sizeof(rrd_value_t) * 1) {
359 rrd_set_error
360 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
361 ds_idx, rrd->rra_def[rra_idx].cf_nam);
362 return;
363 }
364 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
365 sizeof(rrd_value_t), SEEK_CUR);
366 }
367 break;
368 case CF_FAILURES:
369 erase_violations(rrd, cdp_idx, rra_idx);
370 break;
371 default:
372 break;
373 }
374 /* move offset to the next rra */
375 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
376 sizeof(rrd_value_t);
377 }
378 rrd_seek(rrd_file, cdp_start, SEEK_SET);
379 if (rrd_write(rrd_file, rrd->cdp_prep,
380 sizeof(cdp_prep_t) *
381 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
382 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
383 (rrd->stat_head->ds_cnt))) {
384 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
385 }
386 }
388 void init_hwpredict_cdp(
389 cdp_prep_t *cdp)
390 {
391 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
392 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
393 cdp->scratch[CDP_hw_slope].u_val = DNAN;
394 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
395 cdp->scratch[CDP_null_count].u_cnt = 1;
396 cdp->scratch[CDP_last_null_count].u_cnt = 1;
397 }
399 void init_seasonal_cdp(
400 cdp_prep_t *cdp)
401 {
402 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
403 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
404 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
405 }
407 int update_aberrant_CF(
408 rrd_t *rrd,
409 rrd_value_t pdp_val,
410 enum cf_en current_cf,
411 unsigned long cdp_idx,
412 unsigned long rra_idx,
413 unsigned long ds_idx,
414 unsigned short CDP_scratch_idx,
415 rrd_value_t *seasonal_coef)
416 {
417 static hw_functions_t hw_multiplicative_functions = {
418 hw_multiplicative_calculate_prediction,
419 hw_multiplicative_calculate_intercept,
420 hw_calculate_slope,
421 hw_multiplicative_calculate_seasonality,
422 hw_multiplicative_init_seasonality,
423 hw_calculate_seasonal_deviation,
424 hw_init_seasonal_deviation,
425 1.0 /* identity value */
426 };
428 static hw_functions_t hw_additive_functions = {
429 hw_additive_calculate_prediction,
430 hw_additive_calculate_intercept,
431 hw_calculate_slope,
432 hw_additive_calculate_seasonality,
433 hw_additive_init_seasonality,
434 hw_calculate_seasonal_deviation,
435 hw_init_seasonal_deviation,
436 0.0 /* identity value */
437 };
439 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
440 switch (current_cf) {
441 case CF_HWPREDICT:
442 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
443 CDP_scratch_idx, &hw_additive_functions);
444 case CF_MHWPREDICT:
445 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
446 CDP_scratch_idx,
447 &hw_multiplicative_functions);
448 case CF_DEVPREDICT:
449 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
450 CDP_scratch_idx);
451 case CF_SEASONAL:
452 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
453 case CF_HWPREDICT:
454 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
455 CDP_scratch_idx, seasonal_coef,
456 &hw_additive_functions);
457 case CF_MHWPREDICT:
458 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx,
459 CDP_scratch_idx, seasonal_coef,
460 &hw_multiplicative_functions);
461 default:
462 return -1;
463 }
464 case CF_DEVSEASONAL:
465 switch (cf_conv(rrd->rra_def[hw_dep_idx(rrd, rra_idx)].cf_nam)) {
466 case CF_HWPREDICT:
467 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
468 CDP_scratch_idx, seasonal_coef,
469 &hw_additive_functions);
470 case CF_MHWPREDICT:
471 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
472 CDP_scratch_idx, seasonal_coef,
473 &hw_multiplicative_functions);
474 default:
475 return -1;
476 }
477 case CF_FAILURES:
478 switch (cf_conv
479 (rrd->rra_def[hw_dep_idx(rrd, hw_dep_idx(rrd, rra_idx))].
480 cf_nam)) {
481 case CF_HWPREDICT:
482 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
483 CDP_scratch_idx, &hw_additive_functions);
484 case CF_MHWPREDICT:
485 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
486 CDP_scratch_idx,
487 &hw_multiplicative_functions);
488 default:
489 return -1;
490 }
491 case CF_AVERAGE:
492 default:
493 return 0;
494 }
495 return -1;
496 }
498 static unsigned long MyMod(
499 signed long val,
500 unsigned long mod)
501 {
502 unsigned long new_val;
504 if (val < 0)
505 new_val = ((unsigned long) abs(val)) % mod;
506 else
507 new_val = (val % mod);
509 if (val < 0)
510 return (mod - new_val);
511 else
512 return (new_val);
513 }
515 /* a standard fixed-capacity FIF0 queue implementation
516 * No overflow checking is performed. */
517 int queue_alloc(
518 FIFOqueue **q,
519 int capacity)
520 {
521 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
522 if (*q == NULL)
523 return -1;
524 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
525 if ((*q)->queue == NULL) {
526 free(*q);
527 return -1;
528 }
529 (*q)->capacity = capacity;
530 (*q)->head = capacity;
531 (*q)->tail = 0;
532 return 0;
533 }
535 int queue_isempty(
536 FIFOqueue *q)
537 {
538 return (q->head % q->capacity == q->tail);
539 }
541 void queue_push(
542 FIFOqueue *q,
543 rrd_value_t value)
544 {
545 q->queue[(q->tail)++] = value;
546 q->tail = q->tail % q->capacity;
547 }
549 rrd_value_t queue_pop(
550 FIFOqueue *q)
551 {
552 q->head = q->head % q->capacity;
553 return q->queue[(q->head)++];
554 }
556 void queue_dealloc(
557 FIFOqueue *q)
558 {
559 free(q->queue);
560 free(q);
561 }