1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
12 /* #define DEBUG */
14 /* private functions */
15 unsigned long MyMod(
16 signed long val,
17 unsigned long mod);
18 int update_hwpredict(
19 rrd_t *rrd,
20 unsigned long cdp_idx,
21 unsigned long rra_idx,
22 unsigned long ds_idx,
23 unsigned short CDP_scratch_idx);
24 int update_seasonal(
25 rrd_t *rrd,
26 unsigned long cdp_idx,
27 unsigned long rra_idx,
28 unsigned long ds_idx,
29 unsigned short CDP_scratch_idx,
30 rrd_value_t *seasonal_coef);
31 int update_devpredict(
32 rrd_t *rrd,
33 unsigned long cdp_idx,
34 unsigned long rra_idx,
35 unsigned long ds_idx,
36 unsigned short CDP_scratch_idx);
37 int update_devseasonal(
38 rrd_t *rrd,
39 unsigned long cdp_idx,
40 unsigned long rra_idx,
41 unsigned long ds_idx,
42 unsigned short CDP_scratch_idx,
43 rrd_value_t *seasonal_dev);
44 int update_failures(
45 rrd_t *rrd,
46 unsigned long cdp_idx,
47 unsigned long rra_idx,
48 unsigned long ds_idx,
49 unsigned short CDP_scratch_idx);
51 int update_hwpredict(
52 rrd_t *rrd,
53 unsigned long cdp_idx,
54 unsigned long rra_idx,
55 unsigned long ds_idx,
56 unsigned short CDP_scratch_idx)
57 {
58 rrd_value_t prediction, seasonal_coef;
59 unsigned long dependent_rra_idx, seasonal_cdp_idx;
60 unival *coefs = rrd->cdp_prep[cdp_idx].scratch;
61 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
63 /* save coefficients from current prediction */
64 coefs[CDP_hw_last_intercept].u_val = coefs[CDP_hw_intercept].u_val;
65 coefs[CDP_hw_last_slope].u_val = coefs[CDP_hw_slope].u_val;
66 coefs[CDP_last_null_count].u_cnt = coefs[CDP_null_count].u_cnt;
68 /* retrieve the current seasonal coef */
69 dependent_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
70 seasonal_cdp_idx = dependent_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
71 if (dependent_rra_idx < rra_idx)
72 seasonal_coef =
73 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].
74 u_val;
75 else
76 seasonal_coef =
77 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
79 /* compute the prediction */
80 if (isnan(coefs[CDP_hw_intercept].u_val)
81 || isnan(coefs[CDP_hw_slope].u_val)
82 || isnan(seasonal_coef)) {
83 prediction = DNAN;
85 /* bootstrap initialization of slope and intercept */
86 if (isnan(coefs[CDP_hw_intercept].u_val) &&
87 !isnan(coefs[CDP_scratch_idx].u_val)) {
88 #ifdef DEBUG
89 fprintf(stderr, "Initialization of slope/intercept\n");
90 #endif
91 coefs[CDP_hw_intercept].u_val = coefs[CDP_scratch_idx].u_val;
92 coefs[CDP_hw_last_intercept].u_val = coefs[CDP_scratch_idx].u_val;
93 /* initialize the slope to 0 */
94 coefs[CDP_hw_slope].u_val = 0.0;
95 coefs[CDP_hw_last_slope].u_val = 0.0;
96 /* initialize null count to 1 */
97 coefs[CDP_null_count].u_cnt = 1;
98 coefs[CDP_last_null_count].u_cnt = 1;
99 }
100 /* if seasonal coefficient is NA, then don't update intercept, slope */
101 } else {
102 prediction = coefs[CDP_hw_intercept].u_val +
103 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt)
104 + seasonal_coef;
105 #ifdef DEBUG
106 fprintf(stderr, "computed prediction: %f\n", prediction);
107 #endif
108 if (isnan(coefs[CDP_scratch_idx].u_val)) {
109 /* NA value, no updates of intercept, slope;
110 * increment the null count */
111 (coefs[CDP_null_count].u_cnt)++;
112 } else {
113 #ifdef DEBUG
114 fprintf(stderr, "Updating intercept, slope\n");
115 #endif
116 /* update the intercept */
117 coefs[CDP_hw_intercept].u_val =
118 (current_rra->par[RRA_hw_alpha].u_val) *
119 (coefs[CDP_scratch_idx].u_val - seasonal_coef) + (1 -
120 current_rra->
121 par
122 [RRA_hw_alpha].
123 u_val) *
124 (coefs[CDP_hw_intercept].u_val +
125 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt));
126 /* update the slope */
127 coefs[CDP_hw_slope].u_val =
128 (current_rra->par[RRA_hw_beta].u_val) *
129 (coefs[CDP_hw_intercept].u_val -
130 coefs[CDP_hw_last_intercept].u_val) + (1 -
131 current_rra->
132 par[RRA_hw_beta].
133 u_val) *
134 (coefs[CDP_hw_slope].u_val);
135 /* reset the null count */
136 coefs[CDP_null_count].u_cnt = 1;
137 }
138 }
140 /* store the prediction for writing */
141 coefs[CDP_scratch_idx].u_val = prediction;
142 return 0;
143 }
145 int lookup_seasonal(
146 rrd_t *rrd,
147 unsigned long rra_idx,
148 unsigned long rra_start,
149 rrd_file_t *rrd_file,
150 unsigned long offset,
151 rrd_value_t **seasonal_coef)
152 {
153 unsigned long pos_tmp;
155 /* rra_ptr[].cur_row points to the rra row to be written; this function
156 * reads cur_row + offset */
157 unsigned long row_idx = rrd->rra_ptr[rra_idx].cur_row + offset;
159 /* handle wrap around */
160 if (row_idx >= rrd->rra_def[rra_idx].row_cnt)
161 row_idx = row_idx % (rrd->rra_def[rra_idx].row_cnt);
163 /* rra_start points to the appropriate rra block in the file */
164 /* compute the pointer to the appropriate location in the file */
165 pos_tmp =
166 rra_start +
167 (row_idx) * (rrd->stat_head->ds_cnt) * sizeof(rrd_value_t);
169 /* allocate memory if need be */
170 if (*seasonal_coef == NULL)
171 *seasonal_coef =
172 (rrd_value_t *) malloc((rrd->stat_head->ds_cnt) *
173 sizeof(rrd_value_t));
174 if (*seasonal_coef == NULL) {
175 rrd_set_error("memory allocation failure: seasonal coef");
176 return -1;
177 }
179 if (!rrd_seek(rrd_file, pos_tmp, SEEK_SET)) {
180 if (rrd_read
181 (rrd_file, *seasonal_coef,
182 sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)
183 == (ssize_t) (sizeof(rrd_value_t) * rrd->stat_head->ds_cnt)) {
184 /* success! */
185 /* we can safely ignore the rule requiring a seek operation between read
186 * and write, because this read moves the file pointer to somewhere
187 * in the file other than the next write location.
188 * */
189 return 0;
190 } else {
191 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",
192 pos_tmp);
193 }
194 } else {
195 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",
196 pos_tmp);
197 }
199 return -1;
200 }
202 int update_seasonal(
203 rrd_t *rrd,
204 unsigned long cdp_idx,
205 unsigned long rra_idx,
206 unsigned long ds_idx,
207 unsigned short CDP_scratch_idx,
208 rrd_value_t *seasonal_coef)
209 {
210 /* TODO: extract common if subblocks in the wake of I/O optimization */
211 rrd_value_t intercept, seasonal;
212 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
213 rra_def_t *hw_rra =
214 &(rrd->rra_def[current_rra->par[RRA_dependent_rra_idx].u_cnt]);
215 /* obtain cdp_prep index for HWPREDICT */
216 unsigned long hw_cdp_idx = (current_rra->par[RRA_dependent_rra_idx].u_cnt)
217 * (rrd->stat_head->ds_cnt) + ds_idx;
218 unival *coefs = rrd->cdp_prep[hw_cdp_idx].scratch;
220 /* update seasonal coefficient in cdp prep areas */
221 seasonal = rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val;
222 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = seasonal;
223 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val =
224 seasonal_coef[ds_idx];
226 /* update seasonal value for disk */
227 if (current_rra->par[RRA_dependent_rra_idx].u_cnt < rra_idx)
228 /* associated HWPREDICT has already been updated */
229 /* check for possible NA values */
230 if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
231 /* no update, store the old value unchanged,
232 * doesn't matter if it is NA */
233 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
234 } else if (isnan(coefs[CDP_hw_last_intercept].u_val)
235 || isnan(coefs[CDP_hw_last_slope].u_val)) {
236 /* this should never happen, as HWPREDICT was already updated */
237 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
238 } else if (isnan(seasonal)) {
239 /* initialization: intercept is not currently being updated */
240 #ifdef DEBUG
241 fprintf(stderr, "Initialization of seasonal coef %lu\n",
242 rrd->rra_ptr[rra_idx].cur_row);
243 #endif
244 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
245 -= coefs[CDP_hw_last_intercept].u_val;
246 } else {
247 intercept = coefs[CDP_hw_intercept].u_val;
248 #ifdef DEBUG
249 fprintf(stderr,
250 "Updating seasonal, params: gamma %f, new intercept %f, old seasonal %f\n",
251 current_rra->par[RRA_seasonal_gamma].u_val,
252 intercept, seasonal);
253 #endif
254 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
255 (current_rra->par[RRA_seasonal_gamma].u_val) *
256 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
257 intercept) + (1 -
258 current_rra->par[RRA_seasonal_gamma].u_val) *
259 seasonal;
260 } else {
261 /* SEASONAL array is updated first, which means the new intercept
262 * hasn't be computed; so we compute it here. */
264 /* check for possible NA values */
265 if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
266 /* no update, simple store the old value unchanged,
267 * doesn't matter if it is NA */
268 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
269 } else if (isnan(coefs[CDP_hw_intercept].u_val)
270 || isnan(coefs[CDP_hw_slope].u_val)) {
271 /* Initialization of slope and intercept will occur.
272 * force seasonal coefficient to 0. */
273 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
274 } else if (isnan(seasonal)) {
275 /* initialization: intercept will not be updated
276 * CDP_hw_intercept = CDP_hw_last_intercept; just need to
277 * subtract this baseline value. */
278 #ifdef DEBUG
279 fprintf(stderr, "Initialization of seasonal coef %lu\n",
280 rrd->rra_ptr[rra_idx].cur_row);
281 #endif
282 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -=
283 coefs[CDP_hw_intercept].u_val;
284 } else {
285 /* Note that we must get CDP_scratch_idx from SEASONAL array, as CDP_scratch_idx
286 * for HWPREDICT array will be DNAN. */
287 intercept = (hw_rra->par[RRA_hw_alpha].u_val) *
288 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
289 seasonal)
290 + (1 -
291 hw_rra->par[RRA_hw_alpha].u_val) *
292 (coefs[CDP_hw_intercept].u_val +
293 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt));
294 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
295 (current_rra->par[RRA_seasonal_gamma].u_val) *
296 (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -
297 intercept) + (1 -
298 current_rra->par[RRA_seasonal_gamma].u_val) *
299 seasonal;
300 }
301 }
302 #ifdef DEBUG
303 fprintf(stderr, "seasonal coefficient set= %f\n",
304 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
305 #endif
306 return 0;
307 }
309 int update_devpredict(
310 rrd_t *rrd,
311 unsigned long cdp_idx,
312 unsigned long rra_idx,
313 unsigned long ds_idx,
314 unsigned short CDP_scratch_idx)
315 {
316 /* there really isn't any "update" here; the only reason this information
317 * is stored separately from DEVSEASONAL is to preserve deviation predictions
318 * for a longer duration than one seasonal cycle. */
319 unsigned long seasonal_cdp_idx =
320 (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
321 * (rrd->stat_head->ds_cnt) + ds_idx;
323 if (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx) {
324 /* associated DEVSEASONAL array already updated */
325 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
326 =
327 rrd->cdp_prep[seasonal_cdp_idx].
328 scratch[CDP_last_seasonal_deviation].u_val;
329 } else {
330 /* associated DEVSEASONAL not yet updated */
331 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
332 =
333 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_seasonal_deviation].
334 u_val;
335 }
336 return 0;
337 }
339 int update_devseasonal(
340 rrd_t *rrd,
341 unsigned long cdp_idx,
342 unsigned long rra_idx,
343 unsigned long ds_idx,
344 unsigned short CDP_scratch_idx,
345 rrd_value_t *seasonal_dev)
346 {
347 rrd_value_t prediction = 0, seasonal_coef = DNAN;
348 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
350 /* obtain cdp_prep index for HWPREDICT */
351 unsigned long hw_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
352 unsigned long hw_cdp_idx = hw_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
353 unsigned long seasonal_cdp_idx;
354 unival *coefs = rrd->cdp_prep[hw_cdp_idx].scratch;
356 rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val =
357 rrd->cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val;
358 /* retrieve the next seasonal deviation value, could be NA */
359 rrd->cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val =
360 seasonal_dev[ds_idx];
362 /* retrieve the current seasonal_coef (not to be confused with the
363 * current seasonal deviation). Could make this more readable by introducing
364 * some wrapper functions. */
365 seasonal_cdp_idx =
366 (rrd->rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt)
367 * (rrd->stat_head->ds_cnt) + ds_idx;
368 if (rrd->rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
369 /* SEASONAL array already updated */
370 seasonal_coef =
371 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].
372 u_val;
373 else
374 /* SEASONAL array not yet updated */
375 seasonal_coef =
376 rrd->cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
378 /* compute the abs value of the difference between the prediction and
379 * observed value */
380 if (hw_rra_idx < rra_idx) {
381 /* associated HWPREDICT has already been updated */
382 if (isnan(coefs[CDP_hw_last_intercept].u_val) ||
383 isnan(coefs[CDP_hw_last_slope].u_val) || isnan(seasonal_coef)) {
384 /* one of the prediction values is uinitialized */
385 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
386 return 0;
387 } else {
388 prediction = coefs[CDP_hw_last_intercept].u_val +
389 (coefs[CDP_hw_last_slope].u_val) *
390 (coefs[CDP_last_null_count].u_cnt)
391 + seasonal_coef;
392 }
393 } else {
394 /* associated HWPREDICT has NOT been updated */
395 if (isnan(coefs[CDP_hw_intercept].u_val) ||
396 isnan(coefs[CDP_hw_slope].u_val) || isnan(seasonal_coef)) {
397 /* one of the prediction values is uinitialized */
398 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
399 return 0;
400 } else {
401 prediction = coefs[CDP_hw_intercept].u_val +
402 (coefs[CDP_hw_slope].u_val) * (coefs[CDP_null_count].u_cnt)
403 + seasonal_coef;
404 }
405 }
407 if (isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
408 /* no update, store existing value unchanged, doesn't
409 * matter if it is NA */
410 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
411 rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
412 } else
413 if (isnan
414 (rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].
415 u_val)) {
416 /* initialization */
417 #ifdef DEBUG
418 fprintf(stderr, "Initialization of seasonal deviation\n");
419 #endif
420 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
421 fabs(prediction -
422 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
423 } else {
424 /* exponential smoothing update */
425 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
426 (rrd->rra_def[rra_idx].par[RRA_seasonal_gamma].u_val) *
427 fabs(prediction -
428 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)
429 + (1 -
430 rrd->rra_def[rra_idx].par[RRA_seasonal_gamma].u_val) *
431 (rrd->cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].
432 u_val);
433 }
434 return 0;
435 }
437 /* Check for a failure based on a threshold # of violations within the specified
438 * window. */
439 int update_failures(
440 rrd_t *rrd,
441 unsigned long cdp_idx,
442 unsigned long rra_idx,
443 unsigned long ds_idx,
444 unsigned short CDP_scratch_idx)
445 {
446 /* detection of a violation depends on 3 RRAs:
447 * HWPREDICT, SEASONAL, and DEVSEASONAL */
448 rra_def_t *current_rra = &(rrd->rra_def[rra_idx]);
449 unsigned long dev_rra_idx = current_rra->par[RRA_dependent_rra_idx].u_cnt;
450 rra_def_t *dev_rra = &(rrd->rra_def[dev_rra_idx]);
451 unsigned long hw_rra_idx = dev_rra->par[RRA_dependent_rra_idx].u_cnt;
452 rra_def_t *hw_rra = &(rrd->rra_def[hw_rra_idx]);
453 unsigned long seasonal_rra_idx = hw_rra->par[RRA_dependent_rra_idx].u_cnt;
454 unsigned long temp_cdp_idx;
455 rrd_value_t deviation = DNAN;
456 rrd_value_t seasonal_coef = DNAN;
457 rrd_value_t prediction = DNAN;
458 char violation = 0;
459 unsigned short violation_cnt = 0, i;
460 char *violations_array;
462 /* usual checks to determine the order of the RRAs */
463 temp_cdp_idx = dev_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
464 if (rra_idx < seasonal_rra_idx) {
465 /* DEVSEASONAL not yet updated */
466 deviation =
467 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
468 } else {
469 /* DEVSEASONAL already updated */
470 deviation =
471 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_last_seasonal_deviation].
472 u_val;
473 }
474 if (!isnan(deviation)) {
476 temp_cdp_idx = seasonal_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
477 if (rra_idx < seasonal_rra_idx) {
478 /* SEASONAL not yet updated */
479 seasonal_coef =
480 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_seasonal].u_val;
481 } else {
482 /* SEASONAL already updated */
483 seasonal_coef =
484 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_seasonal].
485 u_val;
486 }
487 /* in this code block, we know seasonal coef is not DNAN, because deviation is not
488 * null */
490 temp_cdp_idx = hw_rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
491 if (rra_idx < hw_rra_idx) {
492 /* HWPREDICT not yet updated */
493 prediction =
494 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_intercept].u_val +
495 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_slope].u_val)
496 * (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_null_count].u_cnt)
497 + seasonal_coef;
498 } else {
499 /* HWPREDICT already updated */
500 prediction =
501 rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_intercept].
502 u_val +
503 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_slope].u_val)
504 *
505 (rrd->cdp_prep[temp_cdp_idx].scratch[CDP_last_null_count].
506 u_cnt)
507 + seasonal_coef;
508 }
510 /* determine if the observed value is a violation */
511 if (!isnan(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)) {
512 if (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val >
513 prediction +
514 (current_rra->par[RRA_delta_pos].u_val) * deviation
515 || rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val <
516 prediction -
517 (current_rra->par[RRA_delta_neg].u_val) * deviation)
518 violation = 1;
519 } else {
520 violation = 1; /* count DNAN values as violations */
521 }
523 }
525 /* determine if a failure has occurred and update the failure array */
526 violation_cnt = violation;
527 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
528 for (i = current_rra->par[RRA_window_len].u_cnt; i > 1; i--) {
529 /* shift */
530 violations_array[i - 1] = violations_array[i - 2];
531 violation_cnt += violations_array[i - 1];
532 }
533 violations_array[0] = violation;
535 if (violation_cnt < current_rra->par[RRA_failure_threshold].u_cnt)
536 /* not a failure */
537 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
538 else
539 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 1.0;
541 return (rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
542 }
544 /* For the specified CDP prep area and the FAILURES RRA,
545 * erase all history of past violations.
546 */
547 void erase_violations(
548 rrd_t *rrd,
549 unsigned long cdp_idx,
550 unsigned long rra_idx)
551 {
552 unsigned short i;
553 char *violations_array;
555 /* check that rra_idx is a CF_FAILURES array */
556 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) != CF_FAILURES) {
557 #ifdef DEBUG
558 fprintf(stderr, "erase_violations called for non-FAILURES RRA: %s\n",
559 rrd->rra_def[rra_idx].cf_nam);
560 #endif
561 return;
562 }
563 #ifdef DEBUG
564 fprintf(stderr, "scratch buffer before erase:\n");
565 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
566 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
567 }
568 fprintf(stderr, "\n");
569 #endif
571 /* WARNING: an array of longs on disk is treated as an array of chars
572 * in memory. */
573 violations_array = (char *) ((void *) rrd->cdp_prep[cdp_idx].scratch);
574 /* erase everything in the part of the CDP scratch array that will be
575 * used to store violations for the current window */
576 for (i = rrd->rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--) {
577 violations_array[i - 1] = 0;
578 }
579 #ifdef DEBUG
580 fprintf(stderr, "scratch buffer after erase:\n");
581 for (i = 0; i < MAX_CDP_PAR_EN; i++) {
582 fprintf(stderr, "%lu ", rrd->cdp_prep[cdp_idx].scratch[i].u_cnt);
583 }
584 fprintf(stderr, "\n");
585 #endif
586 }
588 /* Smooth a periodic array with a moving average: equal weights and
589 * length = 5% of the period. */
590 int apply_smoother(
591 rrd_t *rrd,
592 unsigned long rra_idx,
593 unsigned long rra_start,
594 rrd_file_t *rrd_file)
595 {
596 unsigned long i, j, k;
597 unsigned long totalbytes;
598 rrd_value_t *rrd_values;
599 unsigned long row_length = rrd->stat_head->ds_cnt;
600 unsigned long row_count = rrd->rra_def[rra_idx].row_cnt;
601 unsigned long offset;
602 FIFOqueue **buffers;
603 rrd_value_t *working_average;
604 rrd_value_t *baseline;
606 offset = floor(0.025 * row_count);
607 if (offset == 0)
608 return 0; /* no smoothing */
610 /* allocate memory */
611 totalbytes = sizeof(rrd_value_t) * row_length * row_count;
612 rrd_values = (rrd_value_t *) malloc(totalbytes);
613 if (rrd_values == NULL) {
614 rrd_set_error("apply smoother: memory allocation failure");
615 return -1;
616 }
618 /* rra_start is at the beginning of this rra */
619 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
620 rrd_set_error("seek to rra %d failed", rra_start);
621 free(rrd_values);
622 return -1;
623 }
624 rrd_flush(rrd_file);
625 /* could read all data in a single block, but we need to
626 * check for NA values */
627 for (i = 0; i < row_count; ++i) {
628 for (j = 0; j < row_length; ++j) {
629 if (rrd_read
630 (rrd_file, &(rrd_values[i * row_length + j]),
631 sizeof(rrd_value_t) * 1)
632 != (ssize_t) (sizeof(rrd_value_t) * 1)) {
633 rrd_set_error("reading value failed: %s",
634 rrd_strerror(errno));
635 }
636 if (isnan(rrd_values[i * row_length + j])) {
637 /* can't apply smoothing, still uninitialized values */
638 #ifdef DEBUG
639 fprintf(stderr,
640 "apply_smoother: NA detected in seasonal array: %ld %ld\n",
641 i, j);
642 #endif
643 free(rrd_values);
644 return 0;
645 }
646 }
647 }
649 /* allocate queues, one for each data source */
650 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *) * row_length);
651 for (i = 0; i < row_length; ++i) {
652 queue_alloc(&(buffers[i]), 2 * offset + 1);
653 }
654 /* need working average initialized to 0 */
655 working_average = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
656 baseline = (rrd_value_t *) calloc(row_length, sizeof(rrd_value_t));
658 /* compute sums of the first 2*offset terms */
659 for (i = 0; i < 2 * offset; ++i) {
660 k = MyMod(i - offset, row_count);
661 for (j = 0; j < row_length; ++j) {
662 queue_push(buffers[j], rrd_values[k * row_length + j]);
663 working_average[j] += rrd_values[k * row_length + j];
664 }
665 }
667 /* compute moving averages */
668 for (i = offset; i < row_count + offset; ++i) {
669 for (j = 0; j < row_length; ++j) {
670 k = MyMod(i, row_count);
671 /* add a term to the sum */
672 working_average[j] += rrd_values[k * row_length + j];
673 queue_push(buffers[j], rrd_values[k * row_length + j]);
675 /* reset k to be the center of the window */
676 k = MyMod(i - offset, row_count);
677 /* overwrite rdd_values entry, the old value is already
678 * saved in buffers */
679 rrd_values[k * row_length + j] =
680 working_average[j] / (2 * offset + 1);
681 baseline[j] += rrd_values[k * row_length + j];
683 /* remove a term from the sum */
684 working_average[j] -= queue_pop(buffers[j]);
685 }
686 }
688 for (i = 0; i < row_length; ++i) {
689 queue_dealloc(buffers[i]);
690 baseline[i] /= row_count;
691 }
692 free(buffers);
693 free(working_average);
695 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
696 for (j = 0; j < row_length; ++j) {
697 for (i = 0; i < row_count; ++i) {
698 rrd_values[i * row_length + j] -= baseline[j];
699 }
700 /* update the baseline coefficient,
701 * first, compute the cdp_index. */
702 offset = (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
703 * row_length + j;
704 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val +=
705 baseline[j];
706 }
707 /* flush cdp to disk */
708 rrd_flush(rrd_file);
709 if (rrd_seek(rrd_file, sizeof(stat_head_t) +
710 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
711 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
712 sizeof(live_head_t) +
713 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t), SEEK_SET)) {
714 rrd_set_error("apply_smoother: seek to cdp_prep failed");
715 free(rrd_values);
716 return -1;
717 }
718 if (rrd_write(rrd_file, rrd->cdp_prep,
719 sizeof(cdp_prep_t) *
720 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
721 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
722 (rrd->stat_head->ds_cnt))) {
723 rrd_set_error("apply_smoother: cdp_prep write failed");
724 free(rrd_values);
725 return -1;
726 }
727 }
729 /* endif CF_SEASONAL */
730 /* flush updated values to disk */
731 rrd_flush(rrd_file);
732 if (rrd_seek(rrd_file, rra_start, SEEK_SET)) {
733 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
734 free(rrd_values);
735 return -1;
736 }
737 /* write as a single block */
738 if (rrd_write
739 (rrd_file, rrd_values, sizeof(rrd_value_t) * row_length * row_count)
740 != (ssize_t) (sizeof(rrd_value_t) * row_length * row_count)) {
741 rrd_set_error("apply_smoother: write failed to %lu", rra_start);
742 free(rrd_values);
743 return -1;
744 }
746 rrd_flush(rrd_file);
747 free(rrd_values);
748 free(baseline);
749 return 0;
750 }
752 /* Reset aberrant behavior model coefficients, including intercept, slope,
753 * seasonal, and seasonal deviation for the specified data source. */
754 void reset_aberrant_coefficients(
755 rrd_t *rrd,
756 rrd_file_t *rrd_file,
757 unsigned long ds_idx)
758 {
759 unsigned long cdp_idx, rra_idx, i;
760 unsigned long cdp_start, rra_start;
761 rrd_value_t nan_buffer = DNAN;
763 /* compute the offset for the cdp area */
764 cdp_start = sizeof(stat_head_t) +
765 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
766 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
767 sizeof(live_head_t) + rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
768 /* compute the offset for the first rra */
769 rra_start = cdp_start +
770 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) *
771 sizeof(cdp_prep_t) + rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
773 /* loop over the RRAs */
774 for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
775 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
776 switch (cf_conv(rrd->rra_def[rra_idx].cf_nam)) {
777 case CF_HWPREDICT:
778 init_hwpredict_cdp(&(rrd->cdp_prep[cdp_idx]));
779 break;
780 case CF_SEASONAL:
781 case CF_DEVSEASONAL:
782 /* don't use init_seasonal because it will reset burn-in, which
783 * means different data sources will be calling for the smoother
784 * at different times. */
785 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
786 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
787 /* move to first entry of data source for this rra */
788 rrd_seek(rrd_file, rra_start + ds_idx * sizeof(rrd_value_t),
789 SEEK_SET);
790 /* entries for the same data source are not contiguous,
791 * temporal entries are contiguous */
792 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i) {
793 if (rrd_write(rrd_file, &nan_buffer, sizeof(rrd_value_t) * 1)
794 != sizeof(rrd_value_t) * 1) {
795 rrd_set_error
796 ("reset_aberrant_coefficients: write failed data source %lu rra %s",
797 ds_idx, rrd->rra_def[rra_idx].cf_nam);
798 return;
799 }
800 rrd_seek(rrd_file, (rrd->stat_head->ds_cnt - 1) *
801 sizeof(rrd_value_t), SEEK_CUR);
802 }
803 break;
804 case CF_FAILURES:
805 erase_violations(rrd, cdp_idx, rra_idx);
806 break;
807 default:
808 break;
809 }
810 /* move offset to the next rra */
811 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
812 sizeof(rrd_value_t);
813 }
814 rrd_seek(rrd_file, cdp_start, SEEK_SET);
815 if (rrd_write(rrd_file, rrd->cdp_prep,
816 sizeof(cdp_prep_t) *
817 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
818 != (ssize_t) (sizeof(cdp_prep_t) * (rrd->stat_head->rra_cnt) *
819 (rrd->stat_head->ds_cnt))) {
820 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
821 }
822 }
824 void init_hwpredict_cdp(
825 cdp_prep_t *cdp)
826 {
827 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
828 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
829 cdp->scratch[CDP_hw_slope].u_val = DNAN;
830 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
831 cdp->scratch[CDP_null_count].u_cnt = 1;
832 cdp->scratch[CDP_last_null_count].u_cnt = 1;
833 }
835 void init_seasonal_cdp(
836 cdp_prep_t *cdp)
837 {
838 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
839 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
840 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
841 }
843 int update_aberrant_CF(
844 rrd_t *rrd,
845 rrd_value_t pdp_val,
846 enum cf_en current_cf,
847 unsigned long cdp_idx,
848 unsigned long rra_idx,
849 unsigned long ds_idx,
850 unsigned short CDP_scratch_idx,
851 rrd_value_t *seasonal_coef)
852 {
853 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
854 switch (current_cf) {
855 case CF_AVERAGE:
856 default:
857 return 0;
858 case CF_HWPREDICT:
859 return update_hwpredict(rrd, cdp_idx, rra_idx, ds_idx,
860 CDP_scratch_idx);
861 case CF_DEVPREDICT:
862 return update_devpredict(rrd, cdp_idx, rra_idx, ds_idx,
863 CDP_scratch_idx);
864 case CF_SEASONAL:
865 return update_seasonal(rrd, cdp_idx, rra_idx, ds_idx, CDP_scratch_idx,
866 seasonal_coef);
867 case CF_DEVSEASONAL:
868 return update_devseasonal(rrd, cdp_idx, rra_idx, ds_idx,
869 CDP_scratch_idx, seasonal_coef);
870 case CF_FAILURES:
871 return update_failures(rrd, cdp_idx, rra_idx, ds_idx,
872 CDP_scratch_idx);
873 }
874 return -1;
875 }
877 unsigned long MyMod(
878 signed long val,
879 unsigned long mod)
880 {
881 unsigned long new_val;
883 if (val < 0)
884 new_val = ((unsigned long) abs(val)) % mod;
885 else
886 new_val = (val % mod);
888 if (val < 0)
889 return (mod - new_val);
890 else
891 return (new_val);
892 }
894 /* a standard fixed-capacity FIF0 queue implementation
895 * No overflow checking is performed. */
896 int queue_alloc(
897 FIFOqueue **q,
898 int capacity)
899 {
900 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
901 if (*q == NULL)
902 return -1;
903 (*q)->queue = (rrd_value_t *) malloc(sizeof(rrd_value_t) * capacity);
904 if ((*q)->queue == NULL) {
905 free(*q);
906 return -1;
907 }
908 (*q)->capacity = capacity;
909 (*q)->head = capacity;
910 (*q)->tail = 0;
911 return 0;
912 }
914 int queue_isempty(
915 FIFOqueue *q)
916 {
917 return (q->head % q->capacity == q->tail);
918 }
920 void queue_push(
921 FIFOqueue *q,
922 rrd_value_t value)
923 {
924 q->queue[(q->tail)++] = value;
925 q->tail = q->tail % q->capacity;
926 }
928 rrd_value_t queue_pop(
929 FIFOqueue *q)
930 {
931 q->head = q->head % q->capacity;
932 return q->queue[(q->head)++];
933 }
935 void queue_dealloc(
936 FIFOqueue *q)
937 {
938 free(q->queue);
939 free(q);
940 }