9218f594ebae6d6da40f8af1a2063e85d85033db
1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5 *****************************************************************************
6 * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
12 /* #define DEBUG */
14 /* private functions */
15 unsigned long MyMod(signed long val, unsigned long mod);
16 int update_hwpredict(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
17 unsigned long ds_idx, unsigned short CDP_scratch_idx);
18 int update_seasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
19 unsigned long ds_idx, unsigned short CDP_scratch_idx,
20 rrd_value_t *seasonal_coef);
21 int update_devpredict(rrd_t *rrd, unsigned long cdp_idx,
22 unsigned long rra_idx, unsigned long ds_idx, unsigned short CDP_scratch_idx);
23 int update_devseasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
24 unsigned long ds_idx, unsigned short CDP_scratch_idx,
25 rrd_value_t *seasonal_dev);
26 int update_failures(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
27 unsigned long ds_idx, unsigned short CDP_scratch_idx);
29 int
30 update_hwpredict(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
31 unsigned long ds_idx, unsigned short CDP_scratch_idx)
32 {
33 rrd_value_t prediction, seasonal_coef;
34 unsigned long dependent_rra_idx, seasonal_cdp_idx;
35 unival *coefs = rrd -> cdp_prep[cdp_idx].scratch;
36 rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
38 /* save coefficients from current prediction */
39 coefs[CDP_hw_last_intercept].u_val = coefs[CDP_hw_intercept].u_val;
40 coefs[CDP_hw_last_slope].u_val = coefs[CDP_hw_slope].u_val;
41 coefs[CDP_last_null_count].u_cnt = coefs[CDP_null_count].u_cnt;
43 /* retrieve the current seasonal coef */
44 dependent_rra_idx = current_rra -> par[RRA_dependent_rra_idx].u_cnt;
45 seasonal_cdp_idx = dependent_rra_idx*(rrd -> stat_head -> ds_cnt) + ds_idx;
46 if (dependent_rra_idx < rra_idx)
47 seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].u_val;
48 else
49 seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
51 /* compute the prediction */
52 if (isnan(coefs[CDP_hw_intercept].u_val) || isnan(coefs[CDP_hw_slope].u_val)
53 || isnan(seasonal_coef))
54 {
55 prediction = DNAN;
57 /* bootstrap initialization of slope and intercept */
58 if (isnan(coefs[CDP_hw_intercept].u_val) &&
59 !isnan(coefs[CDP_scratch_idx].u_val))
60 {
61 #ifdef DEBUG
62 fprintf(stderr,"Initialization of slope/intercept\n");
63 #endif
64 coefs[CDP_hw_intercept].u_val = coefs[CDP_scratch_idx].u_val;
65 coefs[CDP_hw_last_intercept].u_val = coefs[CDP_scratch_idx].u_val;
66 /* initialize the slope to 0 */
67 coefs[CDP_hw_slope].u_val = 0.0;
68 coefs[CDP_hw_last_slope].u_val = 0.0;
69 /* initialize null count to 1 */
70 coefs[CDP_null_count].u_cnt = 1;
71 coefs[CDP_last_null_count].u_cnt = 1;
72 }
73 /* if seasonal coefficient is NA, then don't update intercept, slope */
74 } else {
75 prediction = coefs[CDP_hw_intercept].u_val +
76 (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt)
77 + seasonal_coef;
78 #ifdef DEBUG
79 fprintf(stderr,"computed prediction: %f\n",prediction);
80 #endif
81 if (isnan(coefs[CDP_scratch_idx].u_val))
82 {
83 /* NA value, no updates of intercept, slope;
84 * increment the null count */
85 (coefs[CDP_null_count].u_cnt)++;
86 } else {
87 #ifdef DEBUG
88 fprintf(stderr,"Updating intercept, slope\n");
89 #endif
90 /* update the intercept */
91 coefs[CDP_hw_intercept].u_val = (current_rra -> par[RRA_hw_alpha].u_val)*
92 (coefs[CDP_scratch_idx].u_val - seasonal_coef) +
93 (1 - current_rra -> par[RRA_hw_alpha].u_val)*(coefs[CDP_hw_intercept].u_val
94 + (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt));
95 /* update the slope */
96 coefs[CDP_hw_slope].u_val = (current_rra -> par[RRA_hw_beta].u_val)*
97 (coefs[CDP_hw_intercept].u_val - coefs[CDP_hw_last_intercept].u_val) +
98 (1 - current_rra -> par[RRA_hw_beta].u_val)*(coefs[CDP_hw_slope].u_val);
99 /* reset the null count */
100 coefs[CDP_null_count].u_cnt = 1;
101 }
102 }
104 /* store the prediction for writing */
105 coefs[CDP_scratch_idx].u_val = prediction;
106 return 0;
107 }
109 int
110 lookup_seasonal(rrd_t *rrd, unsigned long rra_idx, unsigned long rra_start,
111 rrd_file_t* rrd_file, unsigned long offset, rrd_value_t **seasonal_coef)
112 {
113 unsigned long pos_tmp;
114 /* rra_ptr[].cur_row points to the rra row to be written; this function
115 * reads cur_row + offset */
116 unsigned long row_idx = rrd -> rra_ptr[rra_idx].cur_row + offset;
117 /* handle wrap around */
118 if (row_idx >= rrd -> rra_def[rra_idx].row_cnt)
119 row_idx = row_idx % (rrd -> rra_def[rra_idx].row_cnt);
121 /* rra_start points to the appropriate rra block in the file */
122 /* compute the pointer to the appropriate location in the file */
123 pos_tmp = rra_start + (row_idx)*(rrd -> stat_head -> ds_cnt)*sizeof(rrd_value_t);
125 /* allocate memory if need be */
126 if (*seasonal_coef == NULL)
127 *seasonal_coef =
128 (rrd_value_t *) malloc((rrd -> stat_head -> ds_cnt)*sizeof(rrd_value_t));
129 if (*seasonal_coef == NULL) {
130 rrd_set_error("memory allocation failure: seasonal coef");
131 return -1;
132 }
134 if (!rrd_seek(rrd_file,pos_tmp,SEEK_SET))
135 {
136 if (rrd_read(rrd_file,*seasonal_coef,sizeof(rrd_value_t)*rrd->stat_head->ds_cnt)
137 == (ssize_t)(sizeof(rrd_value_t)*rrd->stat_head->ds_cnt))
138 {
139 /* success! */
140 /* we can safely ignore the rule requiring a seek operation between read
141 * and write, because this read moves the file pointer to somewhere
142 * in the file other than the next write location.
143 * */
144 return 0;
145 } else {
146 rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",pos_tmp);
147 }
148 } else {
149 rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",pos_tmp);
150 }
152 return -1;
153 }
155 int
156 update_seasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
157 unsigned long ds_idx, unsigned short CDP_scratch_idx, rrd_value_t *seasonal_coef)
158 {
159 /* TODO: extract common if subblocks in the wake of I/O optimization */
160 rrd_value_t intercept, seasonal;
161 rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
162 rra_def_t *hw_rra = &(rrd -> rra_def[current_rra -> par[RRA_dependent_rra_idx].u_cnt]);
163 /* obtain cdp_prep index for HWPREDICT */
164 unsigned long hw_cdp_idx = (current_rra -> par[RRA_dependent_rra_idx].u_cnt)
165 * (rrd -> stat_head -> ds_cnt) + ds_idx;
166 unival *coefs = rrd -> cdp_prep[hw_cdp_idx].scratch;
168 /* update seasonal coefficient in cdp prep areas */
169 seasonal = rrd -> cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val;
170 rrd -> cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = seasonal;
171 rrd -> cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val =
172 seasonal_coef[ds_idx];
174 /* update seasonal value for disk */
175 if (current_rra -> par[RRA_dependent_rra_idx].u_cnt < rra_idx)
176 /* associated HWPREDICT has already been updated */
177 /* check for possible NA values */
178 if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
179 {
180 /* no update, store the old value unchanged,
181 * doesn't matter if it is NA */
182 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
183 } else if (isnan(coefs[CDP_hw_last_intercept].u_val)
184 || isnan(coefs[CDP_hw_last_slope].u_val))
185 {
186 /* this should never happen, as HWPREDICT was already updated */
187 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val= DNAN;
188 } else if (isnan(seasonal))
189 {
190 /* initialization: intercept is not currently being updated */
191 #ifdef DEBUG
192 fprintf(stderr,"Initialization of seasonal coef %lu\n",
193 rrd -> rra_ptr[rra_idx].cur_row);
194 #endif
195 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
196 -= coefs[CDP_hw_last_intercept].u_val;
197 } else {
198 intercept = coefs[CDP_hw_intercept].u_val;
199 #ifdef DEBUG
200 fprintf(stderr,
201 "Updating seasonal, params: gamma %f, new intercept %f, old seasonal %f\n",
202 current_rra -> par[RRA_seasonal_gamma].u_val,
203 intercept, seasonal);
204 #endif
205 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
206 (current_rra -> par[RRA_seasonal_gamma].u_val)*
207 (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val - intercept) +
208 (1 - current_rra -> par[RRA_seasonal_gamma].u_val)*seasonal;
209 }
210 else {
211 /* SEASONAL array is updated first, which means the new intercept
212 * hasn't be computed; so we compute it here. */
214 /* check for possible NA values */
215 if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
216 {
217 /* no update, simple store the old value unchanged,
218 * doesn't matter if it is NA */
219 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
220 } else if (isnan(coefs[CDP_hw_intercept].u_val)
221 || isnan(coefs[CDP_hw_slope].u_val))
222 {
223 /* Initialization of slope and intercept will occur.
224 * force seasonal coefficient to 0. */
225 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val= 0.0;
226 } else if (isnan(seasonal))
227 {
228 /* initialization: intercept will not be updated
229 * CDP_hw_intercept = CDP_hw_last_intercept; just need to
230 * subtract this baseline value. */
231 #ifdef DEBUG
232 fprintf(stderr,"Initialization of seasonal coef %lu\n",
233 rrd -> rra_ptr[rra_idx].cur_row);
234 #endif
235 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -= coefs[CDP_hw_intercept].u_val;
236 } else {
237 /* Note that we must get CDP_scratch_idx from SEASONAL array, as CDP_scratch_idx
238 * for HWPREDICT array will be DNAN. */
239 intercept = (hw_rra -> par[RRA_hw_alpha].u_val)*
240 (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val - seasonal)
241 + (1 - hw_rra -> par[RRA_hw_alpha].u_val)*(coefs[CDP_hw_intercept].u_val
242 + (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt));
243 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
244 (current_rra -> par[RRA_seasonal_gamma].u_val)*
245 (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val - intercept) +
246 (1 - current_rra -> par[RRA_seasonal_gamma].u_val)*seasonal;
247 }
248 }
249 #ifdef DEBUG
250 fprintf(stderr,"seasonal coefficient set= %f\n",
251 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
252 #endif
253 return 0;
254 }
256 int
257 update_devpredict(rrd_t *rrd, unsigned long cdp_idx,
258 unsigned long rra_idx, unsigned long ds_idx, unsigned short CDP_scratch_idx)
259 {
260 /* there really isn't any "update" here; the only reason this information
261 * is stored separately from DEVSEASONAL is to preserve deviation predictions
262 * for a longer duration than one seasonal cycle. */
263 unsigned long seasonal_cdp_idx = (rrd -> rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
264 * (rrd -> stat_head -> ds_cnt) + ds_idx;
266 if (rrd -> rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
267 {
268 /* associated DEVSEASONAL array already updated */
269 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
270 = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
271 } else {
272 /* associated DEVSEASONAL not yet updated */
273 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
274 = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
275 }
276 return 0;
277 }
279 int
280 update_devseasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
281 unsigned long ds_idx, unsigned short CDP_scratch_idx,
282 rrd_value_t *seasonal_dev)
283 {
284 rrd_value_t prediction = 0, seasonal_coef = DNAN;
285 rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
286 /* obtain cdp_prep index for HWPREDICT */
287 unsigned long hw_rra_idx = current_rra -> par[RRA_dependent_rra_idx].u_cnt;
288 unsigned long hw_cdp_idx = hw_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
289 unsigned long seasonal_cdp_idx;
290 unival *coefs = rrd -> cdp_prep[hw_cdp_idx].scratch;
292 rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val =
293 rrd -> cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val;
294 /* retrieve the next seasonal deviation value, could be NA */
295 rrd -> cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val =
296 seasonal_dev[ds_idx];
298 /* retrieve the current seasonal_coef (not to be confused with the
299 * current seasonal deviation). Could make this more readable by introducing
300 * some wrapper functions. */
301 seasonal_cdp_idx = (rrd -> rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt)
302 *(rrd -> stat_head -> ds_cnt) + ds_idx;
303 if (rrd -> rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
304 /* SEASONAL array already updated */
305 seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].u_val;
306 else
307 /* SEASONAL array not yet updated */
308 seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
310 /* compute the abs value of the difference between the prediction and
311 * observed value */
312 if (hw_rra_idx < rra_idx)
313 {
314 /* associated HWPREDICT has already been updated */
315 if (isnan(coefs[CDP_hw_last_intercept].u_val) ||
316 isnan(coefs[CDP_hw_last_slope].u_val) ||
317 isnan(seasonal_coef))
318 {
319 /* one of the prediction values is uinitialized */
320 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
321 return 0;
322 } else {
323 prediction = coefs[CDP_hw_last_intercept].u_val +
324 (coefs[CDP_hw_last_slope].u_val)*(coefs[CDP_last_null_count].u_cnt)
325 + seasonal_coef;
326 }
327 } else {
328 /* associated HWPREDICT has NOT been updated */
329 if (isnan(coefs[CDP_hw_intercept].u_val) ||
330 isnan(coefs[CDP_hw_slope].u_val) ||
331 isnan(seasonal_coef))
332 {
333 /* one of the prediction values is uinitialized */
334 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
335 return 0;
336 } else {
337 prediction = coefs[CDP_hw_intercept].u_val +
338 (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt)
339 + seasonal_coef;
340 }
341 }
343 if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
344 {
345 /* no update, store existing value unchanged, doesn't
346 * matter if it is NA */
347 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
348 rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
349 } else if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val))
350 {
351 /* initialization */
352 #ifdef DEBUG
353 fprintf(stderr,"Initialization of seasonal deviation\n");
354 #endif
355 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
356 fabs(prediction - rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
357 } else {
358 /* exponential smoothing update */
359 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
360 (rrd -> rra_def[rra_idx].par[RRA_seasonal_gamma].u_val)*
361 fabs(prediction - rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)
362 + (1 - rrd -> rra_def[rra_idx].par[RRA_seasonal_gamma].u_val)*
363 (rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val);
364 }
365 return 0;
366 }
368 /* Check for a failure based on a threshold # of violations within the specified
369 * window. */
370 int
371 update_failures(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
372 unsigned long ds_idx, unsigned short CDP_scratch_idx)
373 {
374 /* detection of a violation depends on 3 RRAs:
375 * HWPREDICT, SEASONAL, and DEVSEASONAL */
376 rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
377 unsigned long dev_rra_idx = current_rra -> par[RRA_dependent_rra_idx].u_cnt;
378 rra_def_t *dev_rra = &(rrd -> rra_def[dev_rra_idx]);
379 unsigned long hw_rra_idx = dev_rra -> par[RRA_dependent_rra_idx].u_cnt;
380 rra_def_t *hw_rra = &(rrd -> rra_def[hw_rra_idx]);
381 unsigned long seasonal_rra_idx = hw_rra -> par[RRA_dependent_rra_idx].u_cnt;
382 unsigned long temp_cdp_idx;
383 rrd_value_t deviation = DNAN;
384 rrd_value_t seasonal_coef = DNAN;
385 rrd_value_t prediction = DNAN;
386 char violation = 0;
387 unsigned short violation_cnt = 0, i;
388 char *violations_array;
390 /* usual checks to determine the order of the RRAs */
391 temp_cdp_idx = dev_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
392 if (rra_idx < seasonal_rra_idx)
393 {
394 /* DEVSEASONAL not yet updated */
395 deviation = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
396 } else {
397 /* DEVSEASONAL already updated */
398 deviation = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
399 }
400 if (!isnan(deviation)) {
402 temp_cdp_idx = seasonal_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
403 if (rra_idx < seasonal_rra_idx)
404 {
405 /* SEASONAL not yet updated */
406 seasonal_coef = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_seasonal].u_val;
407 } else {
408 /* SEASONAL already updated */
409 seasonal_coef = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_seasonal].u_val;
410 }
411 /* in this code block, we know seasonal coef is not DNAN, because deviation is not
412 * null */
414 temp_cdp_idx = hw_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
415 if (rra_idx < hw_rra_idx)
416 {
417 /* HWPREDICT not yet updated */
418 prediction = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_intercept].u_val +
419 (rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_slope].u_val)
420 *(rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_null_count].u_cnt)
421 + seasonal_coef;
422 } else {
423 /* HWPREDICT already updated */
424 prediction = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_intercept].u_val +
425 (rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_slope].u_val)
426 *(rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_last_null_count].u_cnt)
427 + seasonal_coef;
428 }
430 /* determine if the observed value is a violation */
431 if (!isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
432 {
433 if (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val > prediction +
434 (current_rra -> par[RRA_delta_pos].u_val)*deviation
435 || rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val < prediction -
436 (current_rra -> par[RRA_delta_neg].u_val)*deviation)
437 violation = 1;
438 } else {
439 violation = 1; /* count DNAN values as violations */
440 }
442 }
444 /* determine if a failure has occurred and update the failure array */
445 violation_cnt = violation;
446 violations_array = (char *) ((void *) rrd -> cdp_prep[cdp_idx].scratch);
447 for (i = current_rra -> par[RRA_window_len].u_cnt; i > 1; i--)
448 {
449 /* shift */
450 violations_array[i-1] = violations_array[i-2];
451 violation_cnt += violations_array[i-1];
452 }
453 violations_array[0] = violation;
455 if (violation_cnt < current_rra -> par[RRA_failure_threshold].u_cnt)
456 /* not a failure */
457 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
458 else
459 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 1.0;
461 return (rrd-> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
462 }
464 /* For the specified CDP prep area and the FAILURES RRA,
465 * erase all history of past violations.
466 */
467 void
468 erase_violations(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx)
469 {
470 unsigned short i;
471 char *violations_array;
472 /* check that rra_idx is a CF_FAILURES array */
473 if (cf_conv(rrd -> rra_def[rra_idx].cf_nam) != CF_FAILURES)
474 {
475 #ifdef DEBUG
476 fprintf(stderr,"erase_violations called for non-FAILURES RRA: %s\n",
477 rrd -> rra_def[rra_idx].cf_nam);
478 #endif
479 return;
480 }
482 #ifdef DEBUG
483 fprintf(stderr,"scratch buffer before erase:\n");
484 for (i = 0; i < MAX_CDP_PAR_EN; i++)
485 {
486 fprintf(stderr,"%lu ", rrd -> cdp_prep[cdp_idx].scratch[i].u_cnt);
487 }
488 fprintf(stderr,"\n");
489 #endif
491 /* WARNING: an array of longs on disk is treated as an array of chars
492 * in memory. */
493 violations_array = (char *) ((void *) rrd -> cdp_prep[cdp_idx].scratch);
494 /* erase everything in the part of the CDP scratch array that will be
495 * used to store violations for the current window */
496 for (i = rrd -> rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--)
497 {
498 violations_array[i-1] = 0;
499 }
500 #ifdef DEBUG
501 fprintf(stderr,"scratch buffer after erase:\n");
502 for (i = 0; i < MAX_CDP_PAR_EN; i++)
503 {
504 fprintf(stderr,"%lu ", rrd -> cdp_prep[cdp_idx].scratch[i].u_cnt);
505 }
506 fprintf(stderr,"\n");
507 #endif
508 }
510 /* Smooth a periodic array with a moving average: equal weights and
511 * length = 5% of the period. */
512 int
513 apply_smoother(rrd_t *rrd, unsigned long rra_idx, unsigned long rra_start,
514 rrd_file_t *rrd_file)
515 {
516 unsigned long i, j, k;
517 unsigned long totalbytes;
518 rrd_value_t *rrd_values;
519 unsigned long row_length = rrd -> stat_head -> ds_cnt;
520 unsigned long row_count = rrd -> rra_def[rra_idx].row_cnt;
521 unsigned long offset;
522 FIFOqueue **buffers;
523 rrd_value_t *working_average;
524 rrd_value_t *baseline;
526 offset = floor(0.025*row_count);
527 if (offset == 0) return 0; /* no smoothing */
529 /* allocate memory */
530 totalbytes = sizeof(rrd_value_t)*row_length*row_count;
531 rrd_values = (rrd_value_t *) malloc(totalbytes);
532 if (rrd_values == NULL)
533 {
534 rrd_set_error("apply smoother: memory allocation failure");
535 return -1;
536 }
538 /* rra_start is at the beginning of this rra */
539 if (rrd_seek(rrd_file,rra_start,SEEK_SET))
540 {
541 rrd_set_error("seek to rra %d failed", rra_start);
542 free(rrd_values);
543 return -1;
544 }
545 rrd_flush(rrd_file);
546 /* could read all data in a single block, but we need to
547 * check for NA values */
548 for (i = 0; i < row_count; ++i)
549 {
550 for (j = 0; j < row_length; ++j)
551 {
552 if (rrd_read(rrd_file, &(rrd_values[i*row_length + j]),sizeof(rrd_value_t)*1)
553 != (ssize_t)(sizeof(rrd_value_t)*1)) {
554 rrd_set_error("reading value failed: %s", rrd_strerror(errno));
555 }
556 if (isnan(rrd_values[i*row_length + j])) {
557 /* can't apply smoothing, still uninitialized values */
558 #ifdef DEBUG
559 fprintf(stderr,"apply_smoother: NA detected in seasonal array: %ld %ld\n",i,j);
560 #endif
561 free(rrd_values);
562 return 0;
563 }
564 }
565 }
567 /* allocate queues, one for each data source */
568 buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *)*row_length);
569 for (i = 0; i < row_length; ++i)
570 {
571 queue_alloc(&(buffers[i]),2*offset + 1);
572 }
573 /* need working average initialized to 0 */
574 working_average = (rrd_value_t *) calloc(row_length,sizeof(rrd_value_t));
575 baseline = (rrd_value_t *) calloc(row_length,sizeof(rrd_value_t));
577 /* compute sums of the first 2*offset terms */
578 for (i = 0; i < 2*offset; ++i)
579 {
580 k = MyMod(i - offset,row_count);
581 for (j = 0; j < row_length; ++j)
582 {
583 queue_push(buffers[j],rrd_values[k*row_length + j]);
584 working_average[j] += rrd_values[k*row_length + j];
585 }
586 }
588 /* compute moving averages */
589 for (i = offset; i < row_count + offset; ++i)
590 {
591 for (j = 0; j < row_length; ++j)
592 {
593 k = MyMod(i,row_count);
594 /* add a term to the sum */
595 working_average[j] += rrd_values[k*row_length + j];
596 queue_push(buffers[j],rrd_values[k*row_length + j]);
598 /* reset k to be the center of the window */
599 k = MyMod(i - offset,row_count);
600 /* overwrite rdd_values entry, the old value is already
601 * saved in buffers */
602 rrd_values[k*row_length + j] = working_average[j]/(2*offset + 1);
603 baseline[j] += rrd_values[k*row_length + j];
605 /* remove a term from the sum */
606 working_average[j] -= queue_pop(buffers[j]);
607 }
608 }
610 for (i = 0; i < row_length; ++i)
611 {
612 queue_dealloc(buffers[i]);
613 baseline[i] /= row_count;
614 }
615 free(buffers);
616 free(working_average);
618 if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
619 for (j = 0; j < row_length; ++j)
620 {
621 for (i = 0; i < row_count; ++i)
622 {
623 rrd_values[i*row_length + j] -= baseline[j];
624 }
625 /* update the baseline coefficient,
626 * first, compute the cdp_index. */
627 offset = (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
628 * row_length + j;
629 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val += baseline[j];
630 }
631 /* flush cdp to disk */
632 rrd_flush(rrd_file);
633 if (rrd_seek(rrd_file,sizeof(stat_head_t) +
634 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
635 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
636 sizeof(live_head_t) +
637 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t),SEEK_SET))
638 {
639 rrd_set_error("apply_smoother: seek to cdp_prep failed");
640 free(rrd_values);
641 return -1;
642 }
643 if (rrd_write(rrd_file, rrd -> cdp_prep,
644 sizeof(cdp_prep_t)*
645 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
646 != (ssize_t)(sizeof(cdp_prep_t)*(rrd->stat_head->rra_cnt) * (rrd->stat_head->ds_cnt)))
647 {
648 rrd_set_error("apply_smoother: cdp_prep write failed");
649 free(rrd_values);
650 return -1;
651 }
652 } /* endif CF_SEASONAL */
654 /* flush updated values to disk */
655 rrd_flush(rrd_file);
656 if (rrd_seek(rrd_file,rra_start,SEEK_SET))
657 {
658 rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
659 free(rrd_values);
660 return -1;
661 }
662 /* write as a single block */
663 if (rrd_write(rrd_file,rrd_values,sizeof(rrd_value_t)*row_length*row_count)
664 != (ssize_t)(sizeof(rrd_value_t)*row_length*row_count))
665 {
666 rrd_set_error("apply_smoother: write failed to %lu",rra_start);
667 free(rrd_values);
668 return -1;
669 }
671 rrd_flush(rrd_file);
672 free(rrd_values);
673 free(baseline);
674 return 0;
675 }
677 /* Reset aberrant behavior model coefficients, including intercept, slope,
678 * seasonal, and seasonal deviation for the specified data source. */
679 void
680 reset_aberrant_coefficients(rrd_t *rrd, rrd_file_t *rrd_file, unsigned long ds_idx)
681 {
682 unsigned long cdp_idx, rra_idx, i;
683 unsigned long cdp_start, rra_start;
684 rrd_value_t nan_buffer = DNAN;
686 /* compute the offset for the cdp area */
687 cdp_start = sizeof(stat_head_t) +
688 rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
689 rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
690 sizeof(live_head_t) +
691 rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
692 /* compute the offset for the first rra */
693 rra_start = cdp_start +
694 (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) * sizeof(cdp_prep_t) +
695 rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
697 /* loop over the RRAs */
698 for (rra_idx = 0; rra_idx < rrd -> stat_head -> rra_cnt; rra_idx++)
699 {
700 cdp_idx = rra_idx * (rrd-> stat_head-> ds_cnt) + ds_idx;
701 switch (cf_conv(rrd -> rra_def[rra_idx].cf_nam))
702 {
703 case CF_HWPREDICT:
704 init_hwpredict_cdp(&(rrd -> cdp_prep[cdp_idx]));
705 break;
706 case CF_SEASONAL:
707 case CF_DEVSEASONAL:
708 /* don't use init_seasonal because it will reset burn-in, which
709 * means different data sources will be calling for the smoother
710 * at different times. */
711 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
712 rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
713 /* move to first entry of data source for this rra */
714 rrd_seek(rrd_file,rra_start + ds_idx * sizeof(rrd_value_t),SEEK_SET);
715 /* entries for the same data source are not contiguous,
716 * temporal entries are contiguous */
717 for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i)
718 {
719 if (rrd_write(rrd_file,&nan_buffer,sizeof(rrd_value_t)*1) != sizeof(rrd_value_t)*1)
720 {
721 rrd_set_error(
722 "reset_aberrant_coefficients: write failed data source %lu rra %s",
723 ds_idx,rrd->rra_def[rra_idx].cf_nam);
724 return;
725 }
726 rrd_seek(rrd_file,(rrd->stat_head->ds_cnt - 1) *
727 sizeof(rrd_value_t),SEEK_CUR);
728 }
729 break;
730 case CF_FAILURES:
731 erase_violations(rrd,cdp_idx,rra_idx);
732 break;
733 default:
734 break;
735 }
736 /* move offset to the next rra */
737 rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
738 sizeof(rrd_value_t);
739 }
740 rrd_seek(rrd_file,cdp_start,SEEK_SET);
741 if (rrd_write(rrd_file,rrd->cdp_prep,
742 sizeof(cdp_prep_t)*
743 (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt)
744 != (ssize_t)(sizeof(cdp_prep_t)*(rrd->stat_head->rra_cnt) * (rrd->stat_head->ds_cnt)))
745 {
746 rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
747 return;/*XXX: delme */
748 }
749 }
751 void init_hwpredict_cdp(cdp_prep_t *cdp)
752 {
753 cdp->scratch[CDP_hw_intercept].u_val = DNAN;
754 cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
755 cdp->scratch[CDP_hw_slope].u_val = DNAN;
756 cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
757 cdp->scratch[CDP_null_count].u_cnt = 1;
758 cdp->scratch[CDP_last_null_count].u_cnt = 1;
759 }
761 void init_seasonal_cdp(cdp_prep_t *cdp)
762 {
763 cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
764 cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
765 cdp->scratch[CDP_init_seasonal].u_cnt = 1;
766 }
768 int
769 update_aberrant_CF(rrd_t *rrd, rrd_value_t pdp_val, enum cf_en current_cf,
770 unsigned long cdp_idx, unsigned long rra_idx, unsigned long ds_idx,
771 unsigned short CDP_scratch_idx, rrd_value_t *seasonal_coef)
772 {
773 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
774 switch (current_cf) {
775 case CF_AVERAGE:
776 default:
777 return 0;
778 case CF_HWPREDICT:
779 return update_hwpredict(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx);
780 case CF_DEVPREDICT:
781 return update_devpredict(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx);
782 case CF_SEASONAL:
783 return update_seasonal(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx,seasonal_coef);
784 case CF_DEVSEASONAL:
785 return update_devseasonal(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx,seasonal_coef);
786 case CF_FAILURES:
787 return update_failures(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx);
788 }
789 return -1;
790 }
792 unsigned long MyMod(signed long val, unsigned long mod)
793 {
794 unsigned long new_val;
795 if (val < 0)
796 new_val = ((unsigned long) abs(val)) % mod;
797 else
798 new_val = (val % mod);
800 if (val < 0)
801 return (mod - new_val);
802 else
803 return (new_val);
804 }
806 /* a standard fixed-capacity FIF0 queue implementation
807 * No overflow checking is performed. */
808 int queue_alloc(FIFOqueue **q,int capacity)
809 {
810 *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
811 if (*q == NULL) return -1;
812 (*q) -> queue = (rrd_value_t *) malloc(sizeof(rrd_value_t)*capacity);
813 if ((*q) -> queue == NULL)
814 {
815 free(*q);
816 return -1;
817 }
818 (*q) -> capacity = capacity;
819 (*q) -> head = capacity;
820 (*q) -> tail = 0;
821 return 0;
822 }
824 int queue_isempty(FIFOqueue *q)
825 {
826 return (q -> head % q -> capacity == q -> tail);
827 }
829 void queue_push(FIFOqueue *q, rrd_value_t value)
830 {
831 q -> queue[(q -> tail)++] = value;
832 q -> tail = q -> tail % q -> capacity;
833 }
835 rrd_value_t queue_pop(FIFOqueue *q)
836 {
837 q -> head = q -> head % q -> capacity;
838 return q -> queue[(q -> head)++];
839 }
841 void queue_dealloc(FIFOqueue *q)
842 {
843 free(q -> queue);
844 free(q);
845 }