Code

email plugin: Use a thread pool.
authorSebastian Harl <sh@tokkee.org>
Fri, 8 Dec 2006 14:12:42 +0000 (15:12 +0100)
committerFlorian Forster <octo@huhu.verplant.org>
Fri, 8 Dec 2006 14:29:34 +0000 (15:29 +0100)
Up to now the email plugin created a thread for each connection. This thread
was destroyed after the connection has been closed. Now, a pool containing
MaxConns threads is created and a new connection is assigned to a free thread.

Signed-off-by: Sebastian Harl <sh@tokkee.org>
src/email.c

index c1e139ea5aedd164ee65fcc5373b5234cf3c4a90..401ac928108ec6db4e7b8b1067de20c2dbea27ef 100644 (file)
@@ -100,24 +100,30 @@ typedef struct {
        type_t *tail;
 } type_list_t;
 
-/* linked list of collector thread control information */
+/* collector thread control information */
 typedef struct collector {
        pthread_t thread;
 
+       /* socket descriptor of the current/last connection */
+       int socket;
+} collector_t;
+
+/* linked list of pending connections */
+typedef struct conn {
        /* socket to read data from */
        int socket;
 
        /* buffer to read data to */
-       char buffer[BUFSIZE];
+       char *buffer;
        int  idx; /* current position in buffer */
 
-       struct collector *next;
-} collector_t;
+       struct conn *next;
+} conn_t;
 
 typedef struct {
-       collector_t *head;
-       collector_t *tail;
-} collector_list_t;
+       conn_t *head;
+       conn_t *tail;
+} conn_list_t;
 #endif /* EMAIL_HAVE_READ */
 
 /*
@@ -146,16 +152,21 @@ static int disabled = 0;
 static pthread_t connector;
 static int connector_socket;
 
+/* tell the collector threads that a new connection is available */
+static pthread_cond_t conn_available = PTHREAD_COND_INITIALIZER;
+
+/* connections that are waiting to be processed */
+static pthread_mutex_t conns_mutex = PTHREAD_MUTEX_INITIALIZER;
+static conn_list_t conns;
+
 /* tell the connector thread that a collector is available */
 static pthread_cond_t collector_available = PTHREAD_COND_INITIALIZER;
 
-/* collector threads that are in use */
-static pthread_mutex_t active_mutex = PTHREAD_MUTEX_INITIALIZER;
-static collector_list_t active;
+/* collector threads */
+static collector_t **collectors;
 
-/* collector threads that are available for use */
 static pthread_mutex_t available_mutex = PTHREAD_MUTEX_INITIALIZER;
-static collector_list_t available;
+static int available_collectors;
 
 static pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;
 static type_list_t count;
@@ -275,7 +286,7 @@ static void type_list_incr (type_list_t *list, char *name, int incr)
 
 /* Read a single character from the socket. If an error occurs or end-of-file
  * is reached return '\0'. */
-char read_char (collector_t *src)
+char read_char (conn_t *src)
 {
        char ret = '\0';
 
@@ -306,7 +317,7 @@ char read_char (collector_t *src)
                        return '\0';
        } while (EINTR == errno);
        return ret;
-} /* char read_char (collector_t *) */
+} /* char read_char (conn_t *) */
 
 /* Read a single line (terminated by '\n') from the the socket.
  *
@@ -320,7 +331,7 @@ char read_char (collector_t *src)
  * characters of the input stream, the line will will be ignored! By
  * definition we should not get any longer input lines, thus this is
  * acceptable in this case ;-) */
-char *read_line (collector_t *src)
+char *read_line (conn_t *src)
 {
        int  i = 0;
        char *ret;
@@ -334,7 +345,7 @@ char *read_line (collector_t *src)
 
        if (i == src->idx) {
                fd_set fdset;
-       
+
                ssize_t len = 0;
 
                FD_ZERO (&fdset);
@@ -350,7 +361,7 @@ char *read_line (collector_t *src)
                do {
                        errno = 0;
                        if (0 > (len = read (src->socket,
-                                                       (void *)(&(src->buffer[0]) + src->idx),
+                                                       (void *)(src->buffer + src->idx),
                                                        BUFSIZE - src->idx))) {
                                if (EINTR != errno) {
                                        syslog (LOG_ERR, "read() failed: %s", strerror (errno));
@@ -385,7 +396,7 @@ char *read_line (collector_t *src)
        }
 
        ret = (char *)smalloc (i + 1);
-       memcpy (ret, &(src->buffer[0]), i + 1);
+       memcpy (ret, src->buffer, i + 1);
        ret[i] = '\0';
 
        src->idx -= (i + 1);
@@ -393,142 +404,134 @@ char *read_line (collector_t *src)
        if (0 == src->idx)
                src->buffer[0] = '\0';
        else
-               memmove (&(src->buffer[0]), &(src->buffer[i + 1]), src->idx);
+               memmove (src->buffer, src->buffer + i + 1, src->idx);
        return ret;
-} /* char *read_line (collector_t *) */
+} /* char *read_line (conn_t *) */
 
 static void *collect (void *arg)
 {
        collector_t *this = (collector_t *)arg;
-       
-       int loop = 1;
 
-       { /* put the socket in non-blocking mode */
-               int flags = 0;
+       char *buffer = (char *)smalloc (BUFSIZE);
 
-               errno = 0;
-               if (-1 == fcntl (this->socket, F_GETFL, &flags)) {
-                       syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
-                       loop = 0;
-               }
+       while (1) {
+               int loop = 1;
 
-               errno = 0;
-               if (-1 == fcntl (this->socket, F_SETFL, flags | O_NONBLOCK)) {
-                       syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
-                       loop = 0;
-               }
-       }
+               conn_t *connection;
 
-       while (loop) {
-               char *line = read_line (this);
+               pthread_mutex_lock (&conns_mutex);
 
-               if (NULL == line) {
-                       loop = 0;
-                       break;
+               while (NULL == conns.head) {
+                       pthread_cond_wait (&conn_available, &conns_mutex);
                }
 
-               if ('\0' == line[0]) {
-                       free (line);
-                       continue;
-               }
+               connection = conns.head;
+               conns.head = conns.head->next;
 
-               if (':' != line[1]) {
-                       syslog (LOG_ERR, "email: syntax error in line '%s'", line);
-                       free (line);
-                       continue;
+               if (NULL == conns.head) {
+                       conns.tail = NULL;
                }
 
-               if ('e' == line[0]) { /* e:<type>:<bytes> */
-                       char *ptr  = NULL;
-                       char *type = strtok_r (line + 2, ":", &ptr);
-                       char *tmp  = strtok_r (NULL, ":", &ptr);
-                       int  bytes = 0;
+               this->socket = connection->socket;
 
-                       if (NULL == tmp) {
-                               syslog (LOG_ERR, "email: syntax error in line '%s'", line);
-                               free (line);
-                               continue;
-                       }
+               pthread_mutex_unlock (&conns_mutex);
 
-                       bytes = atoi (tmp);
+               connection->buffer = buffer;
+               connection->idx    = 0;
 
-                       pthread_mutex_lock (&count_mutex);
-                       type_list_incr (&count, type, 1);
-                       pthread_mutex_unlock (&count_mutex);
+               { /* put the socket in non-blocking mode */
+                       int flags = 0;
 
-                       pthread_mutex_lock (&size_mutex);
-                       type_list_incr (&size, type, bytes);
-                       pthread_mutex_unlock (&size_mutex);
-               }
-               else if ('s' == line[0]) { /* s:<value> */
-                       pthread_mutex_lock (&score_mutex);
-                       score = (score * (double)score_count + atof (line + 2))
-                                       / (double)(score_count + 1);
-                       ++score_count;
-                       pthread_mutex_unlock (&score_mutex);
-               }
-               else if ('c' == line[0]) { /* c:<type1>[,<type2>,...] */
-                       char *ptr  = NULL;
-                       char *type = strtok_r (line + 2, ",", &ptr);
-
-                       do {
-                               pthread_mutex_lock (&check_mutex);
-                               type_list_incr (&check, type, 1);
-                               pthread_mutex_unlock (&check_mutex);
-                       } while (NULL != (type = strtok_r (NULL, ",", &ptr)));
-               }
-               else {
-                       syslog (LOG_ERR, "email: unknown type '%c'", line[0]);
+                       errno = 0;
+                       if (-1 == fcntl (connection->socket, F_GETFL, &flags)) {
+                               syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
+                               loop = 0;
+                       }
+
+                       errno = 0;
+                       if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) {
+                               syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
+                               loop = 0;
+                       }
                }
 
-               free (line);
-       }
+               while (loop) {
+                       char *line = read_line (connection);
+
+                       if (NULL == line) {
+                               loop = 0;
+                               break;
+                       }
+
+                       if ('\0' == line[0]) {
+                               free (line);
+                               continue;
+                       }
 
-       /* put this thread back into the available list */
-       pthread_mutex_lock (&active_mutex);
-       {
-               collector_t *last;
-               collector_t *ptr;
+                       if (':' != line[1]) {
+                               syslog (LOG_ERR, "email: syntax error in line '%s'", line);
+                               free (line);
+                               continue;
+                       }
 
-               last = NULL;
+                       if ('e' == line[0]) { /* e:<type>:<bytes> */
+                               char *ptr  = NULL;
+                               char *type = strtok_r (line + 2, ":", &ptr);
+                               char *tmp  = strtok_r (NULL, ":", &ptr);
+                               int  bytes = 0;
 
-               for (ptr = active.head; NULL != ptr; last = ptr, ptr = ptr->next) {
-                       if (0 != pthread_equal (ptr->thread, this->thread))
-                               break;
-               }
+                               if (NULL == tmp) {
+                                       syslog (LOG_ERR, "email: syntax error in line '%s'", line);
+                                       free (line);
+                                       continue;
+                               }
 
-               /* the current thread _has_ to be in the active list */
-               assert (NULL != ptr);
+                               bytes = atoi (tmp);
 
-               if (NULL == last) {
-                       active.head = ptr->next;
-               }
-               else {
-                       last->next = ptr->next;
+                               pthread_mutex_lock (&count_mutex);
+                               type_list_incr (&count, type, 1);
+                               pthread_mutex_unlock (&count_mutex);
 
-                       if (NULL == last->next) {
-                               active.tail = last;
+                               pthread_mutex_lock (&size_mutex);
+                               type_list_incr (&size, type, bytes);
+                               pthread_mutex_unlock (&size_mutex);
                        }
-               }
-       }
-       pthread_mutex_unlock (&active_mutex);
+                       else if ('s' == line[0]) { /* s:<value> */
+                               pthread_mutex_lock (&score_mutex);
+                               score = (score * (double)score_count + atof (line + 2))
+                                               / (double)(score_count + 1);
+                               ++score_count;
+                               pthread_mutex_unlock (&score_mutex);
+                       }
+                       else if ('c' == line[0]) { /* c:<type1>[,<type2>,...] */
+                               char *ptr  = NULL;
+                               char *type = strtok_r (line + 2, ",", &ptr);
+
+                               do {
+                                       pthread_mutex_lock (&check_mutex);
+                                       type_list_incr (&check, type, 1);
+                                       pthread_mutex_unlock (&check_mutex);
+                               } while (NULL != (type = strtok_r (NULL, ",", &ptr)));
+                       }
+                       else {
+                               syslog (LOG_ERR, "email: unknown type '%c'", line[0]);
+                       }
+
+                       free (line);
+               } /* while (loop) */
 
-       this->next = NULL;
+               close (connection->socket);
 
-       pthread_mutex_lock (&available_mutex);
+               free (connection);
 
-       if (NULL == available.head) {
-               available.head = this;
-               available.tail = this;
-       }
-       else {
-               available.tail->next = this;
-               available.tail = this;
-       }
+               pthread_mutex_lock (&available_mutex);
+               ++available_collectors;
+               pthread_mutex_unlock (&available_mutex);
 
-       pthread_mutex_unlock (&available_mutex);
+               pthread_cond_signal (&collector_available);
+       } /* while (1) */
 
-       pthread_cond_signal (&collector_available);
+       free (buffer);
        pthread_exit ((void *)0);
 } /* void *collect (void *) */
 
@@ -589,40 +592,50 @@ static void *open_connection (void *arg)
                syslog (LOG_WARNING, "chmod() failed: %s", strerror (errno));
        }
 
-       { /* initialize queue of available threads */
-               int i = 0;
+       { /* initialize collector threads */
+               int i   = 0;
+               int err = 0;
 
-               collector_t *last;
+               pthread_attr_t ptattr;
+
+               conns.head = NULL;
+               conns.tail = NULL;
+
+               pthread_attr_init (&ptattr);
+               pthread_attr_setdetachstate (&ptattr, PTHREAD_CREATE_DETACHED);
 
-               active.head = NULL;
-               active.tail = NULL;
+               available_collectors = max_conns;
 
-               available.head = (collector_t *)smalloc (sizeof (collector_t));
-               available.tail = available.head;
-               available.tail->next = NULL;
+               collectors =
+                       (collector_t **)smalloc (max_conns * sizeof (collector_t *));
 
-               last = available.head;
+               for (i = 0; i < max_conns; ++i) {
+                       collectors[i] = (collector_t *)smalloc (sizeof (collector_t));
+                       collectors[i]->socket = 0;
 
-               for (i = 1; i < max_conns; ++i) {
-                       last->next = (collector_t *)smalloc (sizeof (collector_t));
-                       last = last->next;
-                       available.tail = last;
-                       available.tail->next = NULL;
+                       if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr,
+                                                       collect, collectors[i]))) {
+                               syslog (LOG_ERR, "pthread_create() failed: %s",
+                                               strerror (err));
+                       }
                }
+
+               pthread_attr_destroy (&ptattr);
        }
 
        while (1) {
                int remote = 0;
-               int err    = 0;
-
-               collector_t *collector;
 
-               pthread_attr_t ptattr;
+               conn_t *connection;
 
                pthread_mutex_lock (&available_mutex);
-               while (NULL == available.head) {
+
+               while (0 == available_collectors) {
                        pthread_cond_wait (&collector_available, &available_mutex);
                }
+
+               --available_collectors;
+
                pthread_mutex_unlock (&available_mutex);
 
                do {
@@ -636,62 +649,25 @@ static void *open_connection (void *arg)
                        }
                } while (EINTR == errno);
 
-               /* assign connection to next available thread */
-               pthread_mutex_lock (&available_mutex);
-
-               collector = available.head;
-               collector->socket = remote;
-
-               if (available.head == available.tail) {
-                       available.head = NULL;
-                       available.tail = NULL;
-               }
-               else {
-                       available.head = available.head->next;
-               }
-
-               pthread_mutex_unlock (&available_mutex);
-
-               collector->idx  = 0;
-               collector->next = NULL;
-
-               pthread_attr_init (&ptattr);
-               pthread_attr_setdetachstate (&ptattr, PTHREAD_CREATE_DETACHED);
+               connection = (conn_t *)smalloc (sizeof (conn_t));
 
-               if (0 == (err = pthread_create (&collector->thread, &ptattr, collect,
-                               (void *)collector))) {
-                       pthread_mutex_lock (&active_mutex);
+               connection->socket = remote;
+               connection->next   = NULL;
 
-                       if (NULL == active.head) {
-                               active.head = collector;
-                               active.tail = collector;
-                       }
-                       else {
-                               active.tail->next = collector;
-                               active.tail = collector;
-                       }
+               pthread_mutex_lock (&conns_mutex);
 
-                       pthread_mutex_unlock (&active_mutex);
+               if (NULL == conns.head) {
+                       conns.head = connection;
+                       conns.tail = connection;
                }
                else {
-                       pthread_mutex_lock (&available_mutex);
-
-                       if (NULL == available.head) {
-                               available.head = collector;
-                               available.tail = collector;
-                       }
-                       else {
-                               available.tail->next = collector;
-                               available.tail = collector;
-                       }
-
-                       pthread_mutex_unlock (&available_mutex);
-
-                       close (remote);
-                       syslog (LOG_ERR, "pthread_create() failed: %s", strerror (err));
+                       conns.tail->next = connection;
+                       conns.tail = conns.tail->next;
                }
 
-               pthread_attr_destroy (&ptattr);
+               pthread_mutex_unlock (&conns_mutex);
+
+               pthread_cond_signal (&conn_available);
        }
        pthread_exit ((void *)0);
 } /* void *open_connection (void *) */
@@ -715,7 +691,7 @@ static void email_init (void)
 #if EMAIL_HAVE_READ
 static void email_shutdown (void)
 {
-       collector_t *ptr;
+       int i = 0;
 
        if (disabled)
                return;
@@ -723,14 +699,15 @@ static void email_shutdown (void)
        close (connector_socket);
        pthread_kill (connector, SIGTERM);
 
-       pthread_mutex_lock (&active_mutex);
+       /* don't allow any more connections to be processed */
+       pthread_mutex_lock (&conns_mutex);
 
-       for (ptr = active.head; NULL != ptr; ptr = ptr->next) {
-               close (ptr->socket);
-               pthread_kill (ptr->thread, SIGTERM);
+       for (i = 0; i < max_conns; ++i) {
+               close (collectors[i]->socket);
+               pthread_kill (collectors[i]->thread, SIGTERM);
        }
 
-       pthread_mutex_unlock (&active_mutex);
+       pthread_mutex_unlock (&conns_mutex);
 
        unlink (SOCK_PATH);
        return;