summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (parent: 0fb622a)
raw | patch | inline | side by side (parent: 0fb622a)
author | Sebastian Harl <sh@tokkee.org> | |
Sun, 27 Apr 2008 18:58:44 +0000 (20:58 +0200) | ||
committer | Florian Forster <octo@huhu.verplant.org> | |
Wed, 30 Apr 2008 08:41:48 +0000 (10:41 +0200) |
While looking at the code for some reason, I decided to simplify and
improve large parts of it. Most notably, standard IO streams are now used
to read from the socket. This allowed to remove large parts of the code
which were used to read and buffer data from the socket so far.
Also among the changes:
* free any allocated memory
* added / improved log messages
* do not require euid == 0 to chown() the socket
Signed-off-by: Sebastian Harl <sh@tokkee.org>
Signed-off-by: Florian Forster <octo@huhu.verplant.org>
improve large parts of it. Most notably, standard IO streams are now used
to read from the socket. This allowed to remove large parts of the code
which were used to read and buffer data from the socket so far.
Also among the changes:
* free any allocated memory
* added / improved log messages
* do not require euid == 0 to chown() the socket
Signed-off-by: Sebastian Harl <sh@tokkee.org>
Signed-off-by: Florian Forster <octo@huhu.verplant.org>
src/email.c | patch | blob | history |
diff --git a/src/email.c b/src/email.c
index 869b7c36aa620cff5f7ae2f2d806d79868084eea..87daed113273227cfd3f82601fea5421159e6b67 100644 (file)
--- a/src/email.c
+++ b/src/email.c
/**
* collectd - src/email.c
- * Copyright (C) 2006,2007 Sebastian Harl
+ * Copyright (C) 2006-2008 Sebastian Harl
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
# include <grp.h>
#endif /* HAVE_GRP_H */
-#define MODULE_NAME "email"
-
-/* 256 bytes ought to be enough for anybody ;-) */
-#define BUFSIZE 256
-
#define SOCK_PATH LOCALSTATEDIR"/run/"PACKAGE_NAME"-email"
#define MAX_CONNS 5
#define MAX_CONNS_LIMIT 16384
-#define log_err(...) ERROR (MODULE_NAME": "__VA_ARGS__)
-#define log_warn(...) WARNING (MODULE_NAME": "__VA_ARGS__)
+#define log_debug(...) DEBUG ("email: "__VA_ARGS__)
+#define log_err(...) ERROR ("email: "__VA_ARGS__)
+#define log_warn(...) WARNING ("email: "__VA_ARGS__)
/*
* Private data structures
pthread_t thread;
/* socket descriptor of the current/last connection */
- int socket;
+ FILE *socket;
} collector_t;
/* linked list of pending connections */
typedef struct conn {
/* socket to read data from */
- int socket;
-
- /* buffer to read data to */
- char *buffer;
- int idx; /* current write position in buffer */
- int length; /* length of the current line, i.e. index of '\0' */
+ FILE *socket;
+ /* linked list of connections */
struct conn *next;
} conn_t;
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
/* socket configuration */
-static char *sock_file = SOCK_PATH;
-static char *sock_group = COLLECTD_GRP_NAME;
+static char *sock_file = NULL;
+static char *sock_group = NULL;
static int sock_perms = S_IRWXU | S_IRWXG;
static int max_conns = MAX_CONNS;
static int available_collectors;
static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
-static type_list_t count;
+static type_list_t list_count;
+static type_list_t list_count_copy;
static pthread_mutex_t size_mutex = PTHREAD_MUTEX_INITIALIZER;
-static type_list_t size;
+static type_list_t list_size;
+static type_list_t list_size_copy;
static pthread_mutex_t score_mutex = PTHREAD_MUTEX_INITIALIZER;
static double score;
static int score_count;
static pthread_mutex_t check_mutex = PTHREAD_MUTEX_INITIALIZER;
-static type_list_t check;
+static type_list_t list_check;
+static type_list_t list_check_copy;
/*
* Private functions
static int email_config (const char *key, const char *value)
{
if (0 == strcasecmp (key, "SocketFile")) {
+ if (NULL != sock_file)
+ free (sock_file);
sock_file = sstrdup (value);
}
else if (0 == strcasecmp (key, "SocketGroup")) {
+ if (NULL != sock_group)
+ free (sock_group);
sock_group = sstrdup (value);
}
else if (0 == strcasecmp (key, "SocketPerms")) {
return;
} /* static void type_list_incr (type_list_t *, char *) */
-/* Read a single character from the socket. If an error occurs or end-of-file
- * is reached return '\0'. */
-static char read_char (conn_t *src)
-{
- char ret = '\0';
-
- fd_set fdset;
-
- FD_ZERO (&fdset);
- FD_SET (src->socket, &fdset);
-
- if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
- char errbuf[1024];
- log_err ("select() failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- return '\0';
- }
-
- assert (FD_ISSET (src->socket, &fdset));
-
- do {
- ssize_t len = 0;
-
- errno = 0;
- if (0 > (len = read (src->socket, (void *)&ret, 1))) {
- if (EINTR != errno) {
- char errbuf[1024];
- log_err ("read() failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- return '\0';
- }
- }
-
- if (0 == len)
- return '\0';
- } while (EINTR == errno);
- return ret;
-} /* static char read_char (conn_t *) */
-
-/* Read a single line (terminated by '\n') from the the socket.
- *
- * The return value is zero terminated and does not contain any newline
- * characters.
- *
- * If an error occurs or end-of-file is reached return NULL.
- *
- * IMPORTANT NOTE: If there is no newline character found in BUFSIZE
- * characters of the input stream, the line will will be ignored! By
- * definition we should not get any longer input lines, thus this is
- * acceptable in this case ;-) */
-static char *read_line (conn_t *src)
-{
- int i = 0;
-
- assert ((BUFSIZE >= src->idx) && (src->idx >= 0));
- assert ((src->idx > src->length) || (src->length == 0));
-
- if (src->length > 0) { /* remove old line */
- src->idx -= (src->length + 1);
- memmove (src->buffer, src->buffer + src->length + 1, src->idx);
- src->length = 0;
- }
-
- for (i = 0; i < src->idx; ++i) {
- if ('\n' == src->buffer[i])
- break;
- }
-
- if (i == src->idx) {
- fd_set fdset;
-
- ssize_t len = 0;
-
- FD_ZERO (&fdset);
- FD_SET (src->socket, &fdset);
-
- if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
- char errbuf[1024];
- log_err ("select() failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- return NULL;
- }
-
- assert (FD_ISSET (src->socket, &fdset));
-
- do {
- errno = 0;
- if (0 > (len = read (src->socket,
- (void *)(src->buffer + src->idx),
- BUFSIZE - src->idx))) {
- if (EINTR != errno) {
- char errbuf[1024];
- log_err ("read() failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- return NULL;
- }
- }
-
- if (0 == len)
- return NULL;
- } while (EINTR == errno);
-
- src->idx += len;
-
- for (i = src->idx - len; i < src->idx; ++i) {
- if ('\n' == src->buffer[i])
- break;
- }
-
- if (i == src->idx) {
- src->length = 0;
-
- if (BUFSIZE == src->idx) { /* no space left in buffer */
- while ('\n' != read_char (src))
- /* ignore complete line */;
-
- src->idx = 0;
- }
- return read_line (src);
- }
- }
-
- src->buffer[i] = '\0';
- src->length = i;
-
- return src->buffer;
-} /* static char *read_line (conn_t *) */
-
static void *collect (void *arg)
{
collector_t *this = (collector_t *)arg;
-
- char *buffer = (char *)smalloc (BUFSIZE);
+ pthread_t self = pthread_self ();
while (1) {
int loop = 1;
conns.tail = NULL;
}
- this->socket = connection->socket;
-
pthread_mutex_unlock (&conns_mutex);
- connection->buffer = buffer;
- connection->idx = 0;
- connection->length = 0;
+ /* make the socket available to the global
+ * thread and connection management */
+ this->socket = connection->socket;
- { /* put the socket in non-blocking mode */
- int flags = 0;
+ log_debug ("[thread #%5lu] handling connection on fd #%i",
+ self, fileno (this->socket));
- errno = 0;
- if (-1 == fcntl (connection->socket, F_GETFL, &flags)) {
- char errbuf[1024];
- log_err ("fcntl() failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- loop = 0;
- }
+ while (loop) {
+ /* 256 bytes ought to be enough for anybody ;-) */
+ char line[256 + 1]; /* line + '\0' */
+ int len = 0;
errno = 0;
- if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) {
- char errbuf[1024];
- log_err ("fcntl() failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
+ if (NULL == fgets (line, sizeof (line), this->socket)) {
loop = 0;
+
+ if (0 != errno) {
+ char errbuf[1024];
+ log_err ("[thread #%5lu] reading from socket (fd #%i) "
+ "failed: %s", self, fileno (this->socket),
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ }
+ break;
}
- }
- while (loop) {
- char *line = read_line (connection);
+ len = strlen (line);
+ if (('\n' != line[len - 1]) && ('\r' != line[len - 1])) {
+ log_warn ("[thread #%5lu] line too long (> %i characters): "
+ "'%s' (truncated)", self, sizeof (line) - 1, line);
- if (NULL == line) {
- loop = 0;
- break;
+ while (NULL != fgets (line, sizeof (line), this->socket))
+ if (('\n' == line[len - 1]) || ('\r' == line[len - 1]))
+ break;
+ continue;
}
+ line[len - 1] = '\0';
+
+ log_debug ("[thread #%5lu] line = '%s'", self, line);
+
if (':' != line[1]) {
- log_err ("syntax error in line '%s'", line);
+ log_err ("[thread #%5lu] syntax error in line '%s'",
+ self, line);
continue;
}
int bytes = 0;
if (NULL == tmp) {
- log_err ("syntax error in line '%s'", line);
+ log_err ("[thread #%5lu] syntax error in line '%s'",
+ self, line);
continue;
}
bytes = atoi (tmp);
pthread_mutex_lock (&count_mutex);
- type_list_incr (&count, type, 1);
+ type_list_incr (&list_count, type, 1);
pthread_mutex_unlock (&count_mutex);
if (bytes > 0) {
pthread_mutex_lock (&size_mutex);
- type_list_incr (&size, type, bytes);
+ type_list_incr (&list_size, type, bytes);
pthread_mutex_unlock (&size_mutex);
}
}
do {
pthread_mutex_lock (&check_mutex);
- type_list_incr (&check, type, 1);
+ type_list_incr (&list_check, type, 1);
pthread_mutex_unlock (&check_mutex);
} while (NULL != (type = strtok_r (NULL, ",", &ptr)));
}
else {
- log_err ("unknown type '%c'", line[0]);
+ log_err ("[thread #%5lu] unknown type '%c'", self, line[0]);
}
} /* while (loop) */
- close (connection->socket);
+ log_debug ("[thread #%5lu] shutting down connection on fd #%i",
+ pthread_self (), fileno (this->socket));
+
+ fclose (connection->socket);
free (connection);
- this->socket = -1;
+ this->socket = NULL;
pthread_mutex_lock (&available_mutex);
++available_collectors;
pthread_cond_signal (&collector_available);
} /* while (1) */
- free (buffer);
pthread_exit ((void *)0);
} /* static void *collect (void *) */
{
struct sockaddr_un addr;
+ char *path = (NULL == sock_file) ? SOCK_PATH : sock_file;
+ char *group = (NULL == sock_group) ? COLLECTD_GRP_NAME : sock_group;
+
/* create UNIX socket */
errno = 0;
if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) {
addr.sun_family = AF_UNIX;
- strncpy (addr.sun_path, sock_file, (size_t)(UNIX_PATH_MAX - 1));
+ strncpy (addr.sun_path, path, (size_t)(UNIX_PATH_MAX - 1));
addr.sun_path[UNIX_PATH_MAX - 1] = '\0';
unlink (addr.sun_path);
+ strlen(addr.sun_path))) {
char errbuf[1024];
disabled = 1;
- connector_socket = -1; /* TODO: close? */
+ close (connector_socket);
+ connector_socket = -1;
log_err ("bind() failed: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
pthread_exit ((void *)1);
if (-1 == listen (connector_socket, 5)) {
char errbuf[1024];
disabled = 1;
- connector_socket = -1; /* TODO: close? */
+ close (connector_socket);
+ connector_socket = -1;
log_err ("listen() failed: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
pthread_exit ((void *)1);
}
- if ((uid_t) 0 == geteuid ())
{
struct group sg;
struct group *grp;
int status;
grp = NULL;
- status = getgrnam_r (sock_group, &sg, grbuf, sizeof (grbuf), &grp);
+ status = getgrnam_r (group, &sg, grbuf, sizeof (grbuf), &grp);
if (status != 0)
{
char errbuf[1024];
- log_warn ("getgrnam_r (%s) failed: %s", sock_group,
+ log_warn ("getgrnam_r (%s) failed: %s", group,
sstrerror (errno, errbuf, sizeof (errbuf)));
}
else if (grp == NULL)
{
- log_warn ("No such group: `%s'", sock_group);
+ log_warn ("No such group: `%s'", group);
}
else
{
- status = chown (sock_file, (uid_t) -1, grp->gr_gid);
+ status = chown (path, (uid_t) -1, grp->gr_gid);
if (status != 0)
{
char errbuf[1024];
log_warn ("chown (%s, -1, %i) failed: %s",
- sock_file, (int) grp->gr_gid,
+ path, (int) grp->gr_gid,
sstrerror (errno, errbuf, sizeof (errbuf)));
}
}
}
- else /* geteuid != 0 */
- {
- log_warn ("not running as root");
- }
errno = 0;
- if (0 != chmod (sock_file, sock_perms)) {
+ if (0 != chmod (path, sock_perms)) {
char errbuf[1024];
log_warn ("chmod() failed: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
for (i = 0; i < max_conns; ++i) {
collectors[i] = (collector_t *)smalloc (sizeof (collector_t));
- collectors[i]->socket = -1;
+ collectors[i]->socket = NULL;
if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr,
collect, collectors[i]))) {
if (EINTR != errno) {
char errbuf[1024];
disabled = 1;
- connector_socket = -1; /* TODO: close? */
+ close (connector_socket);
+ connector_socket = -1;
log_err ("accept() failed: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
pthread_exit ((void *)1);
connection = (conn_t *)smalloc (sizeof (conn_t));
- connection->socket = remote;
+ connection->socket = fdopen (remote, "r");
connection->next = NULL;
+ if (NULL == connection->socket) {
+ close (remote);
+ continue;
+ }
+
pthread_mutex_lock (&conns_mutex);
if (NULL == conns.head) {
static int email_shutdown (void)
{
+ type_t *ptr = NULL;
+
int i = 0;
if (connector != ((pthread_t) 0)) {
/* don't allow any more connections to be processed */
pthread_mutex_lock (&conns_mutex);
+ available_collectors = 0;
+
if (collectors != NULL) {
for (i = 0; i < max_conns; ++i) {
if (collectors[i] == NULL)
collectors[i]->thread = (pthread_t) 0;
}
- if (collectors[i]->socket >= 0) {
- close (collectors[i]->socket);
- collectors[i]->socket = -1;
+ if (collectors[i]->socket != NULL) {
+ fclose (collectors[i]->socket);
+ collectors[i]->socket = NULL;
}
+
+ sfree (collectors[i]);
}
+ sfree (collectors);
} /* if (collectors != NULL) */
pthread_mutex_unlock (&conns_mutex);
- unlink (sock_file);
- errno = 0;
+ for (ptr = list_count.head; NULL != ptr; ptr = ptr->next) {
+ free (ptr->name);
+ free (ptr);
+ }
+
+ for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) {
+ free (ptr->name);
+ free (ptr);
+ }
+
+ for (ptr = list_size.head; NULL != ptr; ptr = ptr->next) {
+ free (ptr->name);
+ free (ptr);
+ }
+ for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) {
+ free (ptr->name);
+ free (ptr);
+ }
+
+ for (ptr = list_check.head; NULL != ptr; ptr = ptr->next) {
+ free (ptr->name);
+ free (ptr);
+ }
+
+ for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next) {
+ free (ptr->name);
+ free (ptr);
+ }
+
+ unlink ((NULL == sock_file) ? SOCK_PATH : sock_file);
+
+ sfree (sock_file);
+ sfree (sock_group);
return (0);
} /* static void email_shutdown (void) */
double score_old;
int score_count_old;
- static type_list_t *cnt;
- static type_list_t *sz;
- static type_list_t *chk;
-
if (disabled)
return (-1);
- if (NULL == cnt) {
- cnt = (type_list_t *)smalloc (sizeof (type_list_t));
- cnt->head = NULL;
- }
-
- if (NULL == sz) {
- sz = (type_list_t *)smalloc (sizeof (type_list_t));
- sz->head = NULL;
- }
-
- if (NULL == chk) {
- chk = (type_list_t *)smalloc (sizeof (type_list_t));
- chk->head = NULL;
- }
-
/* email count */
pthread_mutex_lock (&count_mutex);
- copy_type_list (&count, cnt);
+ copy_type_list (&list_count, &list_count_copy);
pthread_mutex_unlock (&count_mutex);
- for (ptr = cnt->head; NULL != ptr; ptr = ptr->next) {
+ for (ptr = list_count_copy.head; NULL != ptr; ptr = ptr->next) {
email_submit ("email_count", ptr->name, ptr->value);
}
/* email size */
pthread_mutex_lock (&size_mutex);
- copy_type_list (&size, sz);
+ copy_type_list (&list_size, &list_size_copy);
pthread_mutex_unlock (&size_mutex);
- for (ptr = sz->head; NULL != ptr; ptr = ptr->next) {
+ for (ptr = list_size_copy.head; NULL != ptr; ptr = ptr->next) {
email_submit ("email_size", ptr->name, ptr->value);
}
/* spam checks */
pthread_mutex_lock (&check_mutex);
- copy_type_list (&check, chk);
+ copy_type_list (&list_check, &list_check_copy);
pthread_mutex_unlock (&check_mutex);
- for (ptr = chk->head; NULL != ptr; ptr = ptr->next)
+ for (ptr = list_check_copy.head; NULL != ptr; ptr = ptr->next)
email_submit ("spam_check", ptr->name, ptr->value);
return (0);