1 #include "rrd_tool.h"
2 #include <dbi/dbi.h>
3 #include <time.h>
5 /* the structures */
6 struct sql_table_helper {
7 dbi_conn conn;
8 int connected;
9 dbi_result result;
10 char const* filename;
11 char const* dbdriver;
12 char* table_start;
13 char* table_next;
14 char const* where;
15 char * timestamp;
16 char * value;
17 };
19 /* the prototypes */
20 static void _sql_close(struct sql_table_helper* th);
21 static int _sql_setparam(struct sql_table_helper* th,char* key, char* value);
22 static int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value,int ordered);
23 static char* _find_next_separator(char* start,char separator);
24 static char* _find_next_separator_twice(char*start,char separator);
25 static char _hexcharhelper(char c);
26 static int _inline_unescape (char* string);
27 static double rrd_fetch_dbi_double(dbi_result *result,int idx);
28 static long rrd_fetch_dbi_long(dbi_result *result,int idx);
30 /* the real code */
32 /* helpers to get correctly converted values from DB*/
33 static long rrd_fetch_dbi_long(dbi_result *result,int idx) {
34 char *ptmp="";
35 long value=DNAN;
36 /* get the attributes for this filed */
37 unsigned int attr=dbi_result_get_field_attribs_idx(result,idx);
38 unsigned int type=dbi_result_get_field_type_idx(result,idx);
39 /* return NAN if NULL */
40 if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
41 /* do some conversions */
42 switch (type) {
43 case DBI_TYPE_STRING:
44 ptmp=(char*)dbi_result_get_string_idx(result,idx);
45 value=atoi(ptmp);
46 break;
47 case DBI_TYPE_INTEGER:
48 if (attr & DBI_INTEGER_SIZE1) { value=dbi_result_get_char_idx(result,idx);
49 } else if (attr & DBI_INTEGER_SIZE2) { value=dbi_result_get_short_idx(result,idx);
50 } else if (attr & DBI_INTEGER_SIZE3) { value=dbi_result_get_int_idx(result,idx);
51 } else if (attr & DBI_INTEGER_SIZE4) { value=dbi_result_get_int_idx(result,idx);
52 } else if (attr & DBI_INTEGER_SIZE8) { value=dbi_result_get_longlong_idx(result,idx);
53 } else { value=DNAN;
54 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type INTEGER\n",time(NULL),idx,attr ); }
55 }
56 break;
57 case DBI_TYPE_DECIMAL:
58 if (attr & DBI_DECIMAL_SIZE4) { value=floor(dbi_result_get_float_idx(result,idx));
59 } else if (attr & DBI_DECIMAL_SIZE8) { value=floor(dbi_result_get_double_idx(result,idx));
60 } else { value=DNAN;
61 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type DECIMAL\n",time(NULL),idx,attr ); }
62 }
63 break;
64 case DBI_TYPE_BINARY:
65 attr=dbi_result_get_field_length_idx(result,idx);
66 ptmp=(char*)dbi_result_get_binary_copy_idx(result,idx);
67 ptmp[attr-1]=0;
68 /* check for "known" libdbi error */
69 if (strncmp("ERROR",ptmp,5)==0) {
70 if (!getenv("RRD_NO_LIBDBI_BUG_WARNING")) {
71 fprintf(stderr,"rrdtool_fetch_libDBI: you have possibly triggered a bug in libDBI by using a (TINY,MEDIUM,LONG) TEXT field with mysql\n this may trigger a core dump in at least one version of libdbi\n if you are not touched by this bug and you find this message annoying\n please set the environment-variable RRD_NO_LIBDBI_BUG_WARNING to ignore this message\n");
72 }
73 }
74 /* convert to number */
75 value=atoi(ptmp);
76 /* free pointer */
77 free(ptmp);
78 break;
79 case DBI_TYPE_DATETIME:
80 value=dbi_result_get_datetime_idx(result,idx);
81 break;
82 default:
83 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported type: %i with attribute %i\n",time(NULL),idx,type,attr ); }
84 value=DNAN;
85 break;
86 }
87 return value;
88 }
90 static double rrd_fetch_dbi_double(dbi_result *result,int idx) {
91 char *ptmp="";
92 double value=DNAN;
93 /* get the attributes for this filed */
94 unsigned int attr=dbi_result_get_field_attribs_idx(result,idx);
95 unsigned int type=dbi_result_get_field_type_idx(result,idx);
96 /* return NAN if NULL */
97 if(dbi_result_field_is_null_idx(result,idx)) { return DNAN; }
98 /* do some conversions */
99 switch (type) {
100 case DBI_TYPE_STRING:
101 ptmp=(char*)dbi_result_get_string_idx(result,idx);
102 value=strtod(ptmp,NULL);
103 break;
104 case DBI_TYPE_INTEGER:
105 if (attr & DBI_INTEGER_SIZE1) { value=dbi_result_get_char_idx(result,idx);
106 } else if (attr & DBI_INTEGER_SIZE2) { value=dbi_result_get_short_idx(result,idx);
107 } else if (attr & DBI_INTEGER_SIZE3) { value=dbi_result_get_int_idx(result,idx);
108 } else if (attr & DBI_INTEGER_SIZE4) { value=dbi_result_get_int_idx(result,idx);
109 } else if (attr & DBI_INTEGER_SIZE8) { value=dbi_result_get_longlong_idx(result,idx);
110 } else { value=DNAN;
111 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type INTEGER\n",time(NULL),idx,attr ); }
112 }
113 break;
114 case DBI_TYPE_DECIMAL:
115 if (attr & DBI_DECIMAL_SIZE4) { value=dbi_result_get_float_idx(result,idx);
116 } else if (attr & DBI_DECIMAL_SIZE8) { value=dbi_result_get_double_idx(result,idx);
117 } else { value=DNAN;
118 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported attribute flags %i for type DECIMAL\n",time(NULL),idx,attr ); }
119 }
120 break;
121 case DBI_TYPE_BINARY:
122 attr=dbi_result_get_field_length_idx(result,idx);
123 ptmp=(char*)dbi_result_get_binary_copy_idx(result,idx);
124 ptmp[attr-1]=0;
125 /* check for "known" libdbi error */
126 if (strncmp("ERROR",ptmp,5)==0) {
127 if (!getenv("RRD_NO_LIBDBI_BUG_WARNING")) {
128 fprintf(stderr,"rrdtool_fetch_libDBI: you have possibly triggered a bug in libDBI by using a (TINY,MEDIUM,LONG) TEXT field with mysql\n this may trigger a core dump in at least one version of libdbi\n if you are not touched by this bug and you find this message annoying\n please set the environment-variable RRD_NO_LIBDBI_BUG_WARNING to ignore this message\n");
129 }
130 }
131 /* convert to number */
132 value=strtod(ptmp,NULL);
133 /* free pointer */
134 free(ptmp);
135 break;
136 case DBI_TYPE_DATETIME:
137 value=dbi_result_get_datetime_idx(result,idx);
138 break;
139 default:
140 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: column %i unsupported type: %i with attribute %i\n",time(NULL),idx,type,attr ); }
141 value=DNAN;
142 break;
143 }
144 return value;
145 }
147 static void _sql_close(struct sql_table_helper* th) {
148 /* close only if connected */
149 if (th->conn) {
150 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: close connection\n",time(NULL) ); }
151 /* shutdown dbi */
152 dbi_conn_close(th->conn);
153 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: shutting down libdbi\n",time(NULL) ); }
154 dbi_shutdown();
155 /* and assign empty */
156 th->conn=NULL;
157 th->connected=0;
158 }
159 }
161 static int _sql_setparam(struct sql_table_helper* th,char* key, char* value) {
162 char* dbi_errstr=NULL;
163 dbi_driver driver;
164 /* if not connected */
165 if (! th->conn) {
166 /* initialize some stuff */
167 th->table_next=th->table_start;
168 th->result=NULL;
169 th->connected=0;
170 /* initialize db */
171 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: initialize libDBI\n",time(NULL) ); }
172 dbi_initialize(NULL);
173 /* load the driver */
174 driver=dbi_driver_open(th->dbdriver);
175 if (! driver) {
176 rrd_set_error( "libdbi - no such driver: %s (possibly a dynamic link problem of the driver being linked without -ldbi)",th->dbdriver);
177 return -1;
178 }
179 /* and connect to driver */
180 th->conn=dbi_conn_open(driver);
181 /* and handle errors */
182 if (! th->conn) {
183 rrd_set_error( "libdbi - could not open connection to driver %s",th->dbdriver);
184 dbi_shutdown();
185 return -1;
186 }
187 }
188 if (th->connected) {
189 rrd_set_error( "we are already connected - can not set parameter %s=%s",key,value);
190 _sql_close(th);
191 return -1;
192 }
193 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: setting option %s to %s\n",time(NULL),key,value ); }
194 if (dbi_conn_set_option(th->conn,key,value)) {
195 dbi_conn_error(th->conn,(const char**)&dbi_errstr);
196 rrd_set_error( "libdbi: problems setting %s to %s - %s",key,value,dbi_errstr);
197 _sql_close(th);
198 return -1;
199 }
200 return 0;
201 }
203 static int _sql_fetchrow(struct sql_table_helper* th,time_t *timestamp, rrd_value_t *value,int ordered) {
204 char* dbi_errstr=NULL;
205 char sql[10240];
206 time_t startt=0,endt=0;
207 /*connect to the database if needed */
208 if (! th->conn) {
209 rrd_set_error( "libdbi no parameters set for libdbi",th->filename,dbi_errstr);
210 return -1;
211 }
212 if (! th->connected) {
213 /* and now connect */
214 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: connect to DB\n",time(NULL) ); }
215 if (dbi_conn_connect(th->conn) <0) {
216 dbi_conn_error(th->conn,(const char**)&dbi_errstr);
217 rrd_set_error( "libdbi: problems connecting to db with connect string %s - error: %s",th->filename,dbi_errstr);
218 _sql_close(th);
219 return -1;
220 }
221 th->connected=1;
222 }
223 /* now find out regarding an existing result-set */
224 if (! th->result) {
225 /* return if table_next is NULL */
226 if (th->table_next==NULL) {
227 if (getenv("RRDDEBUGSQL")) { fprintf(stderr,"RRDDEBUGSQL: %li: reached last table to connect to\n",time(NULL) ); }
228 /* but first close connection */
229 _sql_close(th);
230 /* and return with end of data */
231 return 0;
232 }
233 /* calculate the table to use next */
234 th->table_start=th->table_next;
235 th->table_next=_find_next_separator(th->table_start,'+');
236 _inline_unescape(th->table_start);
237 /* and prepare FULL SQL Statement */
238 if (ordered) {
239 snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s ORDER BY %s",
240 th->timestamp,th->value,th->table_start,th->where,th->timestamp);
241 } else {
242 snprintf(sql,sizeof(sql)-1,"SELECT %s as rrd_time, %s as rrd_value FROM %s WHERE %s",
243 th->timestamp,th->value,th->table_start,th->where);
244 }
245 /* and execute sql */
246 if (getenv("RRDDEBUGSQL")) { startt=time(NULL); fprintf(stderr,"RRDDEBUGSQL: %li: executing %s\n",startt,sql); }
247 th->result=dbi_conn_query(th->conn,sql);
248 if (startt) { endt=time(NULL);fprintf(stderr,"RRDDEBUGSQL: %li: timing %li\n",endt,endt-startt); }
249 /* handle error case */
250 if (! th->result) {
251 dbi_conn_error(th->conn,(const char**)&dbi_errstr);
252 if (startt) { fprintf(stderr,"RRDDEBUGSQL: %li: error %s\n",endt,dbi_errstr); }
253 rrd_set_error("libdbi: problems with query: %s - errormessage: %s",sql,dbi_errstr);
254 _sql_close(th);
255 return -1;
256 }
257 }
258 /* and now fetch key and value */
259 if (! dbi_result_next_row(th->result)) {
260 /* free result */
261 dbi_result_free(th->result);
262 th->result=NULL;
263 /* and call recursively - this will open the next table or close connection as a whole*/
264 return _sql_fetchrow(th,timestamp,value,ordered);
265 }
266 /* and return with flag for one value */
267 *timestamp=rrd_fetch_dbi_long(th->result,1);
268 *value=rrd_fetch_dbi_double(th->result,2);
269 return 1;
270 }
272 static char* _find_next_separator(char* start,char separator) {
273 char* found=strchr(start,separator);
274 /* have we found it */
275 if (found) {
276 /* then 0 terminate current string */
277 *found=0;
278 /* and return the pointer past the separator */
279 return (found+1);
280 }
281 /* not found, so return NULL */
282 return NULL;
283 }
285 static char* _find_next_separator_twice(char*start,char separator) {
286 char *found=start;
287 /* find next separator in string*/
288 while (found) {
289 /* if found and the next one is also a separator */
290 if (found[1] == separator) {
291 /* then 0 terminate current string */
292 *found=0;
293 /* and return the pointer past the current one*/
294 return (found+2);
295 }
296 /* find next occurance */
297 found=strchr(found+1,separator);
298 }
299 /* not found, so return NULL */
300 return NULL;
301 }
303 static char _hexcharhelper(char c) {
304 switch (c) {
305 case '0': return 0 ; break;
306 case '1': return 1 ; break;
307 case '2': return 2 ; break;
308 case '3': return 3 ; break;
309 case '4': return 4 ; break;
310 case '5': return 5 ; break;
311 case '6': return 6 ; break;
312 case '7': return 7 ; break;
313 case '8': return 8 ; break;
314 case '9': return 9 ; break;
315 case 'a': return 10 ; break;
316 case 'b': return 11 ; break;
317 case 'c': return 12 ; break;
318 case 'd': return 13 ; break;
319 case 'e': return 14 ; break;
320 case 'f': return 15 ; break;
321 case 'A': return 10 ; break;
322 case 'B': return 11 ; break;
323 case 'C': return 12 ; break;
324 case 'D': return 13 ; break;
325 case 'E': return 14 ; break;
326 case 'F': return 15 ; break;
327 }
328 return -1;
329 }
331 static int _inline_unescape (char* string) {
332 char *src=string;
333 char *dst=string;
334 char c,h1,h2;
335 while((c= *src)) {
336 src++;
337 if (c == '%') {
338 if (*src == '%') {
339 /* increase src pointer by 1 skiping second % */
340 src+=1;
341 } else {
342 /* try to calculate hex value from the next 2 values*/
343 h1=_hexcharhelper(*src);
344 if (h1 == (char)-1) {
345 rrd_set_error("string escape error at: %s\n",string);
346 return(1);
347 }
348 h2=_hexcharhelper(*(src+1));
349 if (h1 == (char)-1) {
350 rrd_set_error("string escape error at: %s\n",string);
351 return(1);
352 }
353 c=h2+(h1<<4);
354 /* increase src pointer by 2 skiping 2 chars */
355 src+=2;
356 }
357 }
358 *dst=c;
359 dst++;
360 }
361 *dst=0;
362 return 0;
363 }
365 int
366 rrd_fetch_fn_libdbi(
367 const char *filename, /* name of the rrd */
368 enum cf_en cf_idx __attribute__((unused)), /* consolidation function */
369 time_t *start,
370 time_t *end, /* which time frame do you want ?
371 * will be changed to represent reality */
372 unsigned long *step, /* which stepsize do you want?
373 * will be changed to represent reality */
374 unsigned long *ds_cnt, /* number of data sources in file */
375 char ***ds_namv, /* names of data_sources */
376 rrd_value_t **data) /* two dimensional array containing the data */
377 {
378 /* the separator used */
379 char separator='/';
380 /* a local copy of the filename - used for copying plus some pointer variables */
381 char filenameworkcopy[10240];
382 char *tmpptr=filenameworkcopy;
383 char *nextptr=NULL;
384 char *libdbiargs=NULL;
385 char *sqlargs=NULL;
386 /* the settings for the "works" of rrd */
387 int fillmissing=0;
388 unsigned long minstepsize=300;
389 /* by default assume unixtimestamp */
390 int isunixtime=1;
391 /* the result-set */
392 long r_timestamp,l_timestamp,d_timestamp;
393 double r_value,l_value,d_value;
394 int r_status;
395 int rows;
396 long idx;
397 int derive=0;
398 /* the libdbi connection data and the table_help structure */
399 struct sql_table_helper table_help;
400 char where[10240];
401 table_help.conn=NULL;
402 table_help.where=where;
404 /* some loop variables */
405 int i=0;
407 /* check header */
408 if (strncmp("sql",filename,3)!=0) {
409 rrd_set_error( "formatstring wrong - %s",filename );return -1;
410 }
411 if (filename[3]!=filename[4]) {
412 rrd_set_error( "formatstring wrong - %s",filename );return -1;
413 }
415 /* now make this the separator */
416 separator=filename[3];
418 /* copy filename for local modifications during parsing */
419 strncpy(filenameworkcopy,filename+5,sizeof(filenameworkcopy));
421 /* get the driver */
422 table_help.dbdriver=tmpptr;
423 libdbiargs=_find_next_separator(tmpptr,separator);
424 if (! libdbiargs) {
425 /* error in argument */
426 rrd_set_error( "formatstring wrong as we did not find \"%c\"- %s",separator,table_help.dbdriver);
427 return -1;
428 }
430 /* now find the next double separator - this defines the args to the database */
431 sqlargs=_find_next_separator_twice(libdbiargs,separator);
432 if (!sqlargs) {
433 rrd_set_error( "formatstring wrong for db arguments as we did not find \"%c%c\" in \"%s\"",separator,separator,libdbiargs);
434 return 1;
435 }
437 /* now we can start with the SQL Statement - best to start with this first,
438 as then the error-handling is easier, as we do not have to handle libdbi shutdown as well */
440 /* parse the table(s) */
441 table_help.table_start=sqlargs;
442 nextptr=_find_next_separator(table_help.table_start,separator);
443 if (! nextptr) {
444 /* error in argument */
445 rrd_set_error( "formatstring wrong - %s",tmpptr);
446 return -1;
447 }
448 /* hex-unescape the value */
449 if(_inline_unescape(table_help.table_start)) { return -1; }
451 /* parse the unix timestamp column */
452 table_help.timestamp=nextptr;
453 nextptr=_find_next_separator(nextptr,separator);
454 if (! nextptr) {
455 /* error in argument */
456 rrd_set_error( "formatstring wrong - %s",tmpptr);
457 return -1;
458 }
459 /* if we have leading '*', then we have a TIMEDATE Field*/
460 if (table_help.timestamp[0]=='*') { isunixtime=0; table_help.timestamp++; }
461 /* hex-unescape the value */
462 if(_inline_unescape(table_help.timestamp)) { return -1; }
464 /* parse the value column */
465 table_help.value=nextptr;
466 nextptr=_find_next_separator(nextptr,separator);
467 if (! nextptr) {
468 /* error in argument */
469 rrd_set_error( "formatstring wrong - %s",tmpptr);
470 return -1;
471 }
472 /* hex-unescape the value */
473 if(_inline_unescape(table_help.value)) { return -1; }
475 /* now prepare WHERE clause as empty string*/
476 where[0]=0;
478 /* and the where clause */
479 sqlargs=nextptr;
480 while(sqlargs) {
481 /* find next separator */
482 nextptr=_find_next_separator(sqlargs,separator);
483 /* now handle fields */
484 if (strcmp(sqlargs,"derive")==0) { /* the derive option with the default allowed max delta */
485 derive=600;
486 } else if (strcmp(sqlargs,"prediction")==0) {
487 rrd_set_error("argument prediction is no longer supported in a DEF - use new generic CDEF-functions instead");
488 return -1;
489 } else if (strcmp(sqlargs,"sigma")==0) {
490 rrd_set_error("argument sigma is no longer supported in a DEF - use new generic CDEF-functions instead");
491 return -1;
492 } else if (*sqlargs==0) { /* ignore empty */
493 } else { /* else add to where string */
494 if (where[0]) {strcat(where," AND ");}
495 strcat(where,sqlargs);
496 }
497 /* and continue loop with next pointer */
498 sqlargs=nextptr;
499 }
500 /* and unescape */
501 if(_inline_unescape(where)) { return -1; }
503 /* now parse LIBDBI options - this start initializing libdbi and beyond this point we need to reset the db as well in case of errors*/
504 while (libdbiargs) {
505 /* find separator */
506 nextptr=_find_next_separator(libdbiargs,separator);
507 /* now find =, separating key from value*/
508 tmpptr=_find_next_separator(libdbiargs,'=');
509 if (! tmpptr) {
510 rrd_set_error( "formatstring wrong for db arguments as we did not find \"=\" in \"%s\"",libdbiargs);
511 _sql_close(&table_help);
512 return 1;
513 }
514 /* hex-unescape the value */
515 if(_inline_unescape(tmpptr)) { return -1; }
516 /* now handle the key/value pair */
517 if (strcmp(libdbiargs,"rrdminstepsize")==0) { /* allow override for minstepsize */
518 i=atoi(tmpptr);if (i>0) { minstepsize=i; }
519 } else if (strcmp(libdbiargs,"rrdfillmissing")==0) { /* allow override for minstepsize */
520 i=atoi(tmpptr);if (i>0) { fillmissing=i; }
521 } else if (strcmp(libdbiargs,"rrdderivemaxstep")==0) { /* allow override for derived max delta */
522 i=atoi(tmpptr);if (i>0) { if (derive) { derive=i; }}
523 } else { /* store in libdbi, as these are parameters */
524 if (_sql_setparam(&table_help,libdbiargs,tmpptr)) { return -1; }
525 }
526 /* and continue loop with next pointer */
527 libdbiargs=nextptr;
528 }
530 /* and modify step if given */
531 if (*step<minstepsize) {*step=minstepsize;}
532 *start-=(*start)%(*step);
533 *end-=(*end)%(*step);
535 /* and append the SQL WHERE Clause for the timeframe calculated above (adding AND if required) */
536 if (where[0]) {strcat(where," AND ");}
537 i=strlen(where);
538 if (isunixtime) {
539 snprintf(where+i,sizeof(where)-1-i,"%li < %s AND %s < %li",*start,table_help.timestamp,table_help.timestamp,*end);
540 } else {
541 char tsstart[64];strftime(tsstart,sizeof(tsstart),"%Y-%m-%d %H:%M:%S",localtime(start));
542 char tsend[64];strftime(tsend,sizeof(tsend),"%Y-%m-%d %H:%M:%S",localtime(end));
543 snprintf(where+i,sizeof(where)-1-i,"'%s' < %s AND %s < '%s'",tsstart,table_help.timestamp,table_help.timestamp,tsend);
544 }
546 /* and now calculate the number of rows in the resultset... */
547 rows=((*end)-(*start))/(*step)+2;
549 /* define the result set variables/columns returned */
550 *ds_cnt=5;
551 *ds_namv=(char**)malloc((*ds_cnt)*sizeof(char*));
552 for (i=0;i<(int)(*ds_cnt);i++) {
553 tmpptr=(char*)malloc(sizeof(char) * DS_NAM_SIZE);
554 (*ds_namv)[i]=tmpptr;
555 /* now copy what is required */
556 switch (i) {
557 case 0: strncpy(tmpptr,"min",DS_NAM_SIZE-1); break;
558 case 1: strncpy(tmpptr,"avg",DS_NAM_SIZE-1); break;
559 case 2: strncpy(tmpptr,"max",DS_NAM_SIZE-1); break;
560 case 3: strncpy(tmpptr,"count",DS_NAM_SIZE-1); break;
561 case 4: strncpy(tmpptr,"sigma",DS_NAM_SIZE-1); break;
562 }
563 }
565 /* allocate memory for resultset (with the following columns: min,avg,max,count,sigma) */
566 i=rows * sizeof(rrd_value_t)*(*ds_cnt);
567 if (((*data) = malloc(i))==NULL){
568 /* and return error */
569 rrd_set_error("malloc failed for %i bytes",i);
570 return(-1);
571 }
572 /* and fill with NAN */
573 for(i=0;i<rows;i++) {
574 (*data)[i*(*ds_cnt)+0]=DNAN; /* MIN */
575 (*data)[i*(*ds_cnt)+1]=DNAN; /* AVG */
576 (*data)[i*(*ds_cnt)+2]=DNAN; /* MAX */
577 (*data)[i*(*ds_cnt)+3]=0; /* COUNT */
578 (*data)[i*(*ds_cnt)+4]=DNAN; /* SIGMA */
579 }
580 /* and assign undefined values for last - in case of derived calculation */
581 l_value=DNAN;l_timestamp=0;
582 /* here goes the real work processing all data */
583 while((r_status=_sql_fetchrow(&table_help,&r_timestamp,&r_value,derive))>0) {
584 /* processing of value */
585 /* calculate index for the timestamp */
586 idx=(r_timestamp-(*start))/(*step);
587 /* some out of bounds checks on idx */
588 if (idx<0) { idx=0;}
589 if (idx>rows) { idx=rows;}
590 /* and calculate derivative if necessary */
591 if (derive) {
592 /* calc deltas */
593 d_timestamp=r_timestamp-l_timestamp;
594 d_value=r_value-l_value;
595 /* assign current as last values */
596 l_timestamp=r_timestamp;
597 l_value=r_value;
598 /* assign DNAN by default for value */
599 r_value=DNAN;
600 /* check for timestamp delta to be within an acceptable range */
601 if ((d_timestamp>0)&&(d_timestamp<2*derive)) {
602 /* only handle positive delta - avoid wrap-arrounds/counter resets showing up as spikes */
603 if (d_value>0) {
604 /* and normalize to per second */
605 r_value=d_value/d_timestamp;
606 }
607 }
608 }
609 /* only add value if we have a value that is not NAN */
610 if (! isnan(r_value)) {
611 if ((*data)[idx*(*ds_cnt)+3]==0) { /* count is 0 so assign to overwrite DNAN */
612 (*data)[idx*(*ds_cnt)+0]=r_value; /* MIN */
613 (*data)[idx*(*ds_cnt)+1]=r_value; /* AVG */
614 (*data)[idx*(*ds_cnt)+2]=r_value; /* MAX */
615 (*data)[idx*(*ds_cnt)+3]=1; /* COUNT */
616 (*data)[idx*(*ds_cnt)+4]=r_value; /* SIGMA */
617 } else {
618 /* MIN */
619 if ((*data)[idx*(*ds_cnt)+0]>r_value) { (*data)[idx*(*ds_cnt)+0]=r_value; }
620 /* AVG - at this moment still sum - corrected in post processing */
621 (*data)[idx*(*ds_cnt)+1]+=r_value;
622 /* MAX */
623 if ((*data)[idx*(*ds_cnt)+2]<r_value) { (*data)[idx*(*ds_cnt)+2]=r_value; }
624 /* COUNT */
625 (*data)[idx*(*ds_cnt)+3]++;
626 /* SIGMA - at this moment still sum of squares - corrected in post processing */
627 (*data)[idx*(*ds_cnt)+4]+=r_value*r_value;
628 }
629 }
630 }
631 /* and check for negativ status, pass back immediately */
632 if (r_status==-1) { return -1; }
634 /* post processing */
635 for(idx=0;idx<rows;idx++) {
636 long count=(*data)[idx*(*ds_cnt)+3];
637 if (count>0) {
638 /* calc deviation first */
639 if (count>2) {
640 r_value=count*(*data)[idx*(*ds_cnt)+4]-(*data)[idx*(*ds_cnt)+1]*(*data)[idx*(*ds_cnt)+1];
641 if (r_value<0) {
642 r_value=DNAN;
643 } else {
644 r_value=sqrt(r_value/(count*(count-1)));
645 }
646 }
647 (*data)[idx*(*ds_cnt)+4]=r_value;
648 /* now the average */
649 (*data)[idx*(*ds_cnt)+1]/=count;
650 }
651 }
653 /* and return OK */
654 return 0;
655 }