1 /**
2 * collectd - src/email.c
3 * Copyright (C) 2006-2008 Sebastian Harl
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
22 *
23 * Authors:
24 * Sebastian Harl <sh at tokkee.org>
25 **/
27 /*
28 * This plugin communicates with a spam filter, a virus scanner or similar
29 * software using a UNIX socket and a very simple protocol:
30 *
31 * e-mail type (e.g. ham, spam, virus, ...) and size
32 * e:<type>:<bytes>
33 *
34 * spam score
35 * s:<value>
36 *
37 * successful spam checks
38 * c:<type1>[,<type2>,...]
39 */
41 #include "collectd.h"
43 #include "common.h"
44 #include "plugin.h"
46 #include <stddef.h>
48 #include <sys/select.h>
49 #include <sys/un.h>
51 /* some systems (e.g. Darwin) seem to not define UNIX_PATH_MAX at all */
52 #ifndef UNIX_PATH_MAX
53 #define UNIX_PATH_MAX sizeof(((struct sockaddr_un *)0)->sun_path)
54 #endif /* UNIX_PATH_MAX */
56 #if HAVE_GRP_H
57 #include <grp.h>
58 #endif /* HAVE_GRP_H */
60 #define SOCK_PATH LOCALSTATEDIR "/run/" PACKAGE_NAME "-email"
61 #define MAX_CONNS 5
62 #define MAX_CONNS_LIMIT 16384
64 #define log_debug(...) DEBUG("email: "__VA_ARGS__)
65 #define log_err(...) ERROR("email: "__VA_ARGS__)
66 #define log_warn(...) WARNING("email: "__VA_ARGS__)
68 /*
69 * Private data structures
70 */
71 /* linked list of email and check types */
72 typedef struct type {
73 char *name;
74 int value;
75 struct type *next;
76 } type_t;
78 typedef struct {
79 type_t *head;
80 type_t *tail;
81 } type_list_t;
83 /* collector thread control information */
84 typedef struct collector {
85 pthread_t thread;
87 /* socket descriptor of the current/last connection */
88 FILE *socket;
89 } collector_t;
91 /* linked list of pending connections */
92 typedef struct conn {
93 /* socket to read data from */
94 FILE *socket;
96 /* linked list of connections */
97 struct conn *next;
98 } conn_t;
100 typedef struct {
101 conn_t *head;
102 conn_t *tail;
103 } conn_list_t;
105 /*
106 * Private variables
107 */
108 /* valid configuration file keys */
109 static const char *config_keys[] = {"SocketFile", "SocketGroup", "SocketPerms",
110 "MaxConns"};
111 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
113 /* socket configuration */
114 static char *sock_file = NULL;
115 static char *sock_group = NULL;
116 static int sock_perms = S_IRWXU | S_IRWXG;
117 static int max_conns = MAX_CONNS;
119 /* state of the plugin */
120 static int disabled = 0;
122 /* thread managing "client" connections */
123 static pthread_t connector = (pthread_t)0;
124 static int connector_socket = -1;
126 /* tell the collector threads that a new connection is available */
127 static pthread_cond_t conn_available = PTHREAD_COND_INITIALIZER;
129 /* connections that are waiting to be processed */
130 static pthread_mutex_t conns_mutex = PTHREAD_MUTEX_INITIALIZER;
131 static conn_list_t conns;
133 /* tell the connector thread that a collector is available */
134 static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER;
136 /* collector threads */
137 static collector_t **collectors = NULL;
139 static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
140 static int available_collectors;
142 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
143 static type_list_t list_count;
144 static type_list_t list_count_copy;
146 static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
147 static type_list_t list_size;
148 static type_list_t list_size_copy;
150 static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
151 static double score;
152 static int score_count;
154 static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
155 static type_list_t list_check;
156 static type_list_t list_check_copy;
158 /*
159 * Private functions
160 */
161 static int email_config(const char *key, const char *value) {
162 if (0 == strcasecmp(key, "SocketFile")) {
163 if (NULL != sock_file)
164 free(sock_file);
165 sock_file = sstrdup(value);
166 } else if (0 == strcasecmp(key, "SocketGroup")) {
167 if (NULL != sock_group)
168 free(sock_group);
169 sock_group = sstrdup(value);
170 } else if (0 == strcasecmp(key, "SocketPerms")) {
171 /* the user is responsible for providing reasonable values */
172 sock_perms = (int)strtol(value, NULL, 8);
173 } else if (0 == strcasecmp(key, "MaxConns")) {
174 long int tmp = strtol(value, NULL, 0);
176 if (tmp < 1) {
177 fprintf(stderr, "email plugin: `MaxConns' was set to invalid "
178 "value %li, will use default %i.\n",
179 tmp, MAX_CONNS);
180 ERROR("email plugin: `MaxConns' was set to invalid "
181 "value %li, will use default %i.\n",
182 tmp, MAX_CONNS);
183 max_conns = MAX_CONNS;
184 } else if (tmp > MAX_CONNS_LIMIT) {
185 fprintf(stderr, "email plugin: `MaxConns' was set to invalid "
186 "value %li, will use hardcoded limit %i.\n",
187 tmp, MAX_CONNS_LIMIT);
188 ERROR("email plugin: `MaxConns' was set to invalid "
189 "value %li, will use hardcoded limit %i.\n",
190 tmp, MAX_CONNS_LIMIT);
191 max_conns = MAX_CONNS_LIMIT;
192 } else {
193 max_conns = (int)tmp;
194 }
195 } else {
196 return -1;
197 }
198 return 0;
199 } /* static int email_config (char *, char *) */
201 /* Increment the value of the given name in the given list by incr. */
202 static void type_list_incr(type_list_t *list, char *name, int incr) {
203 if (NULL == list->head) {
204 list->head = smalloc(sizeof(*list->head));
206 list->head->name = sstrdup(name);
207 list->head->value = incr;
208 list->head->next = NULL;
210 list->tail = list->head;
211 } else {
212 type_t *ptr;
214 for (ptr = list->head; NULL != ptr; ptr = ptr->next) {
215 if (0 == strcmp(name, ptr->name))
216 break;
217 }
219 if (NULL == ptr) {
220 list->tail->next = smalloc(sizeof(*list->tail->next));
221 list->tail = list->tail->next;
223 list->tail->name = sstrdup(name);
224 list->tail->value = incr;
225 list->tail->next = NULL;
226 } else {
227 ptr->value += incr;
228 }
229 }
230 return;
231 } /* static void type_list_incr (type_list_t *, char *) */
233 static void *collect(void *arg) {
234 collector_t *this = (collector_t *)arg;
236 while (1) {
237 conn_t *connection;
239 pthread_mutex_lock(&conns_mutex);
241 while (NULL == conns.head) {
242 pthread_cond_wait(&conn_available, &conns_mutex);
243 }
245 connection = conns.head;
246 conns.head = conns.head->next;
248 if (NULL == conns.head) {
249 conns.tail = NULL;
250 }
252 pthread_mutex_unlock(&conns_mutex);
254 /* make the socket available to the global
255 * thread and connection management */
256 this->socket = connection->socket;
258 log_debug("collect: handling connection on fd #%i", fileno(this->socket));
260 while (42) {
261 /* 256 bytes ought to be enough for anybody ;-) */
262 char line[256 + 1]; /* line + '\0' */
263 int len = 0;
265 errno = 0;
266 if (NULL == fgets(line, sizeof(line), this->socket)) {
267 if (0 != errno) {
268 char errbuf[1024];
269 log_err("collect: reading from socket (fd #%i) "
270 "failed: %s",
271 fileno(this->socket),
272 sstrerror(errno, errbuf, sizeof(errbuf)));
273 }
274 break;
275 }
277 len = strlen(line);
278 if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) {
279 log_warn("collect: line too long (> %zu characters): "
280 "'%s' (truncated)",
281 sizeof(line) - 1, line);
283 while (NULL != fgets(line, sizeof(line), this->socket))
284 if (('\n' == line[len - 1]) || ('\r' == line[len - 1]))
285 break;
286 continue;
287 }
288 if (len < 3) { /* [a-z] ':' '\n' */
289 continue;
290 }
292 line[len - 1] = 0;
294 log_debug("collect: line = '%s'", line);
296 if (':' != line[1]) {
297 log_err("collect: syntax error in line '%s'", line);
298 continue;
299 }
301 if ('e' == line[0]) { /* e:<type>:<bytes> */
302 char *ptr = NULL;
303 char *type = strtok_r(line + 2, ":", &ptr);
304 char *tmp = strtok_r(NULL, ":", &ptr);
305 int bytes = 0;
307 if (NULL == tmp) {
308 log_err("collect: syntax error in line '%s'", line);
309 continue;
310 }
312 bytes = atoi(tmp);
314 pthread_mutex_lock(&count_mutex);
315 type_list_incr(&list_count, type, /* increment = */ 1);
316 pthread_mutex_unlock(&count_mutex);
318 if (bytes > 0) {
319 pthread_mutex_lock(&size_mutex);
320 type_list_incr(&list_size, type, /* increment = */ bytes);
321 pthread_mutex_unlock(&size_mutex);
322 }
323 } else if ('s' == line[0]) { /* s:<value> */
324 pthread_mutex_lock(&score_mutex);
325 score = (score * (double)score_count + atof(line + 2)) /
326 (double)(score_count + 1);
327 ++score_count;
328 pthread_mutex_unlock(&score_mutex);
329 } else if ('c' == line[0]) { /* c:<type1>[,<type2>,...] */
330 char *dummy = line + 2;
331 char *endptr = NULL;
332 char *type;
334 pthread_mutex_lock(&check_mutex);
335 while ((type = strtok_r(dummy, ",", &endptr)) != NULL) {
336 dummy = NULL;
337 type_list_incr(&list_check, type, /* increment = */ 1);
338 }
339 pthread_mutex_unlock(&check_mutex);
340 } else {
341 log_err("collect: unknown type '%c'", line[0]);
342 }
343 } /* while (42) */
345 log_debug("Shutting down connection on fd #%i", fileno(this->socket));
347 fclose(connection->socket);
348 free(connection);
350 this->socket = NULL;
352 pthread_mutex_lock(&available_mutex);
353 ++available_collectors;
354 pthread_mutex_unlock(&available_mutex);
356 pthread_cond_signal(&collector_available);
357 } /* while (1) */
359 pthread_exit((void *)0);
360 return ((void *)0);
361 } /* static void *collect (void *) */
363 static void *open_connection(void __attribute__((unused)) * arg) {
364 const char *path = (NULL == sock_file) ? SOCK_PATH : sock_file;
365 const char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group;
367 /* create UNIX socket */
368 errno = 0;
369 if (-1 == (connector_socket = socket(PF_UNIX, SOCK_STREAM, 0))) {
370 char errbuf[1024];
371 disabled = 1;
372 log_err("socket() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
373 pthread_exit((void *)1);
374 }
376 struct sockaddr_un addr = {
377 .sun_family = AF_UNIX
378 };
379 sstrncpy(addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1));
381 errno = 0;
382 if (-1 ==
383 bind(connector_socket, (struct sockaddr *)&addr,
384 offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path))) {
385 char errbuf[1024];
386 disabled = 1;
387 close(connector_socket);
388 connector_socket = -1;
389 log_err("bind() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
390 pthread_exit((void *)1);
391 }
393 errno = 0;
394 if (-1 == listen(connector_socket, 5)) {
395 char errbuf[1024];
396 disabled = 1;
397 close(connector_socket);
398 connector_socket = -1;
399 log_err("listen() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
400 pthread_exit((void *)1);
401 }
403 {
404 struct group sg;
405 struct group *grp;
406 char grbuf[2048];
407 int status;
409 grp = NULL;
410 status = getgrnam_r(group, &sg, grbuf, sizeof(grbuf), &grp);
411 if (status != 0) {
412 char errbuf[1024];
413 log_warn("getgrnam_r (%s) failed: %s", group,
414 sstrerror(errno, errbuf, sizeof(errbuf)));
415 } else if (grp == NULL) {
416 log_warn("No such group: `%s'", group);
417 } else {
418 status = chown(path, (uid_t)-1, grp->gr_gid);
419 if (status != 0) {
420 char errbuf[1024];
421 log_warn("chown (%s, -1, %i) failed: %s", path, (int)grp->gr_gid,
422 sstrerror(errno, errbuf, sizeof(errbuf)));
423 }
424 }
425 }
427 errno = 0;
428 if (0 != chmod(path, sock_perms)) {
429 char errbuf[1024];
430 log_warn("chmod() failed: %s", sstrerror(errno, errbuf, sizeof(errbuf)));
431 }
433 { /* initialize collector threads */
434 pthread_attr_t ptattr;
436 conns.head = NULL;
437 conns.tail = NULL;
439 pthread_attr_init(&ptattr);
440 pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED);
442 available_collectors = max_conns;
444 collectors = smalloc(max_conns * sizeof(*collectors));
446 for (int i = 0; i < max_conns; ++i) {
447 collectors[i] = smalloc(sizeof(*collectors[i]));
448 collectors[i]->socket = NULL;
450 if (plugin_thread_create(&collectors[i]->thread, &ptattr, collect,
451 collectors[i], "email collector") != 0) {
452 char errbuf[1024];
453 log_err("plugin_thread_create() failed: %s",
454 sstrerror(errno, errbuf, sizeof(errbuf)));
455 collectors[i]->thread = (pthread_t)0;
456 }
457 }
459 pthread_attr_destroy(&ptattr);
460 }
462 while (1) {
463 int remote = 0;
465 conn_t *connection;
467 pthread_mutex_lock(&available_mutex);
469 while (0 == available_collectors) {
470 pthread_cond_wait(&collector_available, &available_mutex);
471 }
473 --available_collectors;
475 pthread_mutex_unlock(&available_mutex);
477 while (42) {
478 errno = 0;
480 remote = accept(connector_socket, NULL, NULL);
481 if (remote == -1) {
482 char errbuf[1024];
484 if (errno == EINTR)
485 continue;
487 disabled = 1;
488 close(connector_socket);
489 connector_socket = -1;
490 log_err("accept() failed: %s",
491 sstrerror(errno, errbuf, sizeof(errbuf)));
492 pthread_exit((void *)1);
493 }
495 /* access() succeeded. */
496 break;
497 }
499 connection = calloc(1, sizeof(*connection));
500 if (connection == NULL) {
501 close(remote);
502 continue;
503 }
505 connection->socket = fdopen(remote, "r");
506 connection->next = NULL;
508 if (NULL == connection->socket) {
509 close(remote);
510 sfree(connection);
511 continue;
512 }
514 pthread_mutex_lock(&conns_mutex);
516 if (NULL == conns.head) {
517 conns.head = connection;
518 conns.tail = connection;
519 } else {
520 conns.tail->next = connection;
521 conns.tail = conns.tail->next;
522 }
524 pthread_mutex_unlock(&conns_mutex);
526 pthread_cond_signal(&conn_available);
527 }
529 pthread_exit((void *)0);
530 return ((void *)0);
531 } /* static void *open_connection (void *) */
533 static int email_init(void) {
534 if (plugin_thread_create(&connector, NULL, open_connection, NULL,
535 "email listener") != 0) {
536 char errbuf[1024];
537 disabled = 1;
538 log_err("plugin_thread_create() failed: %s",
539 sstrerror(errno, errbuf, sizeof(errbuf)));
540 return (-1);
541 }
543 return (0);
544 } /* int email_init */
546 static void type_list_free(type_list_t *t) {
547 type_t *this;
549 this = t->head;
550 while (this != NULL) {
551 type_t *next = this->next;
553 sfree(this->name);
554 sfree(this);
556 this = next;
557 }
559 t->head = NULL;
560 t->tail = NULL;
561 }
563 static int email_shutdown(void) {
564 if (connector != ((pthread_t)0)) {
565 pthread_kill(connector, SIGTERM);
566 connector = (pthread_t)0;
567 }
569 if (connector_socket >= 0) {
570 close(connector_socket);
571 connector_socket = -1;
572 }
574 /* don't allow any more connections to be processed */
575 pthread_mutex_lock(&conns_mutex);
577 available_collectors = 0;
579 if (collectors != NULL) {
580 for (int i = 0; i < max_conns; ++i) {
581 if (collectors[i] == NULL)
582 continue;
584 if (collectors[i]->thread != ((pthread_t)0)) {
585 pthread_kill(collectors[i]->thread, SIGTERM);
586 collectors[i]->thread = (pthread_t)0;
587 }
589 if (collectors[i]->socket != NULL) {
590 fclose(collectors[i]->socket);
591 collectors[i]->socket = NULL;
592 }
594 sfree(collectors[i]);
595 }
596 sfree(collectors);
597 } /* if (collectors != NULL) */
599 pthread_mutex_unlock(&conns_mutex);
601 type_list_free(&list_count);
602 type_list_free(&list_count_copy);
603 type_list_free(&list_size);
604 type_list_free(&list_size_copy);
605 type_list_free(&list_check);
606 type_list_free(&list_check_copy);
608 unlink((NULL == sock_file) ? SOCK_PATH : sock_file);
610 sfree(sock_file);
611 sfree(sock_group);
612 return (0);
613 } /* static void email_shutdown (void) */
615 static void email_submit(const char *type, const char *type_instance,
616 gauge_t value) {
617 value_list_t vl = VALUE_LIST_INIT;
619 vl.values = &(value_t){.gauge = value};
620 vl.values_len = 1;
621 sstrncpy(vl.plugin, "email", sizeof(vl.plugin));
622 sstrncpy(vl.type, type, sizeof(vl.type));
623 sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
625 plugin_dispatch_values(&vl);
626 } /* void email_submit */
628 /* Copy list l1 to list l2. l2 may partly exist already, but it is assumed
629 * that neither the order nor the name of any element of either list is
630 * changed and no elements are deleted. The values of l1 are reset to zero
631 * after they have been copied to l2. */
632 static void copy_type_list(type_list_t *l1, type_list_t *l2) {
633 type_t *last = NULL;
635 for (type_t *ptr1 = l1->head, *ptr2 = l2->head; NULL != ptr1;
636 ptr1 = ptr1->next, last = ptr2, ptr2 = ptr2->next) {
637 if (NULL == ptr2) {
638 ptr2 = smalloc(sizeof(*ptr2));
639 ptr2->name = NULL;
640 ptr2->next = NULL;
642 if (NULL == last) {
643 l2->head = ptr2;
644 } else {
645 last->next = ptr2;
646 }
648 l2->tail = ptr2;
649 }
651 if (NULL == ptr2->name) {
652 ptr2->name = sstrdup(ptr1->name);
653 }
655 ptr2->value = ptr1->value;
656 ptr1->value = 0;
657 }
658 return;
659 }
661 static int email_read(void) {
662 double score_old;
663 int score_count_old;
665 if (disabled)
666 return (-1);
668 /* email count */
669 pthread_mutex_lock(&count_mutex);
671 copy_type_list(&list_count, &list_count_copy);
673 pthread_mutex_unlock(&count_mutex);
675 for (type_t *ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) {
676 email_submit("email_count", ptr->name, ptr->value);
677 }
679 /* email size */
680 pthread_mutex_lock(&size_mutex);
682 copy_type_list(&list_size, &list_size_copy);
684 pthread_mutex_unlock(&size_mutex);
686 for (type_t *ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) {
687 email_submit("email_size", ptr->name, ptr->value);
688 }
690 /* spam score */
691 pthread_mutex_lock(&score_mutex);
693 score_old = score;
694 score_count_old = score_count;
695 score = 0.0;
696 score_count = 0;
698 pthread_mutex_unlock(&score_mutex);
700 if (score_count_old > 0)
701 email_submit("spam_score", "", score_old);
703 /* spam checks */
704 pthread_mutex_lock(&check_mutex);
706 copy_type_list(&list_check, &list_check_copy);
708 pthread_mutex_unlock(&check_mutex);
710 for (type_t *ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next)
711 email_submit("spam_check", ptr->name, ptr->value);
713 return (0);
714 } /* int email_read */
716 void module_register(void) {
717 plugin_register_config("email", email_config, config_keys, config_keys_num);
718 plugin_register_init("email", email_init);
719 plugin_register_read("email", email_read);
720 plugin_register_shutdown("email", email_shutdown);
721 } /* void module_register */
723 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */