488fbac7f9aafee4b803672059a24ce10d146053
1 /*
2 **
3 ** collectd-amqp
4 ** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
5 **
6 ** Permission is hereby granted, free of charge, to any person
7 ** obtaining a copy of this software and associated documentation
8 ** files (the "Software"), to deal in the Software without
9 ** restriction, including without limitation the rights to use,
10 ** copy, modify, merge, publish, distribute, sublicense, and/or sell
11 ** copies of the Software, and to permit persons to whom the
12 ** Software is furnished to do so, subject to the following
13 ** conditions:
14 **
15 ** The above copyright notice and this permission notice shall be
16 ** included in all copies or substantial portions of the Software.
17 **
18 ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 ** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 ** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 ** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 ** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 ** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 ** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 ** OTHER DEALINGS IN THE SOFTWARE.
26 **
27 */
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
40 #include <amqp.h>
41 #include <amqp_framing.h>
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44 * library.. */
45 #define AMQP_DM_VOLATILE 1
46 #define AMQP_DM_PERSISTENT 2
48 static int port = 5672;
49 static char *host = NULL;
50 static char *vhost = NULL;
51 static char *user = NULL;
52 static char *password = NULL;
53 static char *exchange = NULL;
54 static char *routingkey = NULL;
55 static uint8_t delivery_mode = AMQP_DM_VOLATILE;
57 static amqp_connection_state_t amqp_conn = NULL;
58 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
60 static const char *config_keys[] =
61 {
62 "Host",
63 "Port",
64 "VHost",
65 "User",
66 "Password",
67 "Exchange",
68 "RoutingKey",
69 "Persistent"
70 };
72 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
74 static int config_set(char **var, const char *value)
75 {
76 sfree(*var);
77 if ((*var = strdup(value)) == NULL)
78 return (1);
79 return (0);
80 }
82 static int config(const char *key, const char *value)
83 {
84 if (strcasecmp(key, "host") == 0)
85 return (config_set(&host, value));
86 else if(strcasecmp(key, "port") == 0)
87 {
88 int tmp;
90 tmp = service_name_to_port_number (value);
91 if (tmp <= 0)
92 {
93 ERROR ("AMQP plugin: Cannot parse `%s' as a "
94 "service name (port number).", value);
95 return (1);
96 }
98 port = tmp;
99 return (0);
100 }
101 else if (strcasecmp(key, "vhost") == 0)
102 return (config_set(&vhost, value));
103 else if (strcasecmp(key, "user") == 0)
104 return (config_set(&user, value));
105 else if (strcasecmp(key, "password") == 0)
106 return (config_set(&password, value));
107 else if (strcasecmp(key, "exchange") == 0)
108 return (config_set(&exchange, value));
109 else if (strcasecmp(key, "routingkey") == 0)
110 return (config_set(&routingkey, value));
111 else if (strcasecmp ("Persistent", key) == 0)
112 {
113 if (IS_TRUE (value))
114 delivery_mode = AMQP_DM_PERSISTENT;
115 else
116 delivery_mode = AMQP_DM_VOLATILE;
117 return (0);
118 }
119 return (-1);
120 }
122 static int amqp_connect (void)
123 {
124 amqp_rpc_reply_t reply;
125 int sockfd;
126 int status;
128 if (amqp_conn != NULL)
129 return (0);
131 amqp_conn = amqp_new_connection ();
132 if (amqp_conn == NULL)
133 {
134 ERROR ("amqp plugin: amqp_new_connection failed.");
135 return (ENOMEM);
136 }
138 sockfd = amqp_open_socket (host, port);
139 if (sockfd < 0)
140 {
141 char errbuf[1024];
142 status = (-1) * sockfd;
143 ERROR ("amqp plugin: amqp_open_socket failed: %s",
144 sstrerror (status, errbuf, sizeof (errbuf)));
145 amqp_destroy_connection(amqp_conn);
146 amqp_conn = NULL;
147 return (status);
148 }
150 amqp_set_sockfd (amqp_conn, sockfd);
152 reply = amqp_login(amqp_conn, vhost,
153 /* channel max = */ 0,
154 /* frame max = */ 131072,
155 /* heartbeat = */ 0,
156 /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
157 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
158 {
159 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
160 vhost, user);
161 amqp_destroy_connection(amqp_conn);
162 close(sockfd);
163 amqp_conn = NULL;
164 return (1);
165 }
167 amqp_channel_open (amqp_conn, /* channel = */ 1);
168 /* FIXME: Is checking "reply.reply_type" really correct here? How does
169 * it get set? --octo */
170 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
171 {
172 ERROR ("amqp plugin: amqp_channel_open failed.");
173 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
174 amqp_destroy_connection(amqp_conn);
175 close(sockfd);
176 amqp_conn = NULL;
177 return (1);
178 }
180 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
181 "on %s:%i.", vhost, host, port);
182 return (0);
183 } /* int amqp_connect */
185 static int amqp_write_locked (const char *buffer)
186 {
187 amqp_basic_properties_t props;
188 int status;
190 status = amqp_connect ();
191 if (status != 0)
192 return (status);
194 memset (&props, 0, sizeof (props));
195 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
196 props.content_type = amqp_cstring_bytes("application/json");
197 props.delivery_mode = delivery_mode;
199 status = amqp_basic_publish(amqp_conn,
200 /* channel = */ 1,
201 amqp_cstring_bytes(exchange),
202 amqp_cstring_bytes(routingkey),
203 /* mandatory = */ 0,
204 /* immediate = */ 0,
205 &props,
206 amqp_cstring_bytes(buffer));
207 if (status != 0)
208 {
209 int sockfd;
211 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
212 status);
214 sockfd = amqp_get_sockfd (amqp_conn);
215 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
216 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
217 amqp_destroy_connection (amqp_conn);
218 close (sockfd);
219 amqp_conn = NULL;
220 }
222 return (status);
223 } /* int amqp_write_locked */
225 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
226 __attribute__((unused)) user_data_t *user_data)
227 {
228 char buffer[4096];
229 size_t bfree;
230 size_t bfill;
231 int status;
233 if ((ds == NULL) || (vl == NULL))
234 return (EINVAL);
236 memset (buffer, 0, sizeof (buffer));
237 bfree = sizeof (buffer);
238 bfill = 0;
240 format_json_initialize(buffer, &bfill, &bfree);
241 /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
242 format_json_value_list(buffer, &bfill, &bfree, ds, vl, /* rates = */ 0);
243 format_json_finalize(buffer, &bfill, &bfree);
245 pthread_mutex_lock (&amqp_conn_lock);
246 status = amqp_write_locked (buffer);
247 pthread_mutex_unlock (&amqp_conn_lock);
249 return (status);
250 } /* int amqp_write */
252 static int shutdown(void)
253 {
254 pthread_mutex_lock (&amqp_conn_lock);
255 if (amqp_conn != NULL)
256 {
257 int sockfd;
259 sockfd = amqp_get_sockfd (amqp_conn);
260 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
261 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
262 amqp_destroy_connection (amqp_conn);
263 close(sockfd);
264 amqp_conn = NULL;
265 }
266 pthread_mutex_unlock (&amqp_conn_lock);
268 sfree(host);
269 sfree(vhost);
270 sfree(user);
271 sfree(password);
272 sfree(exchange);
273 sfree(routingkey);
275 return (0);
276 }
278 void module_register(void)
279 {
280 plugin_register_config("amqp", config, config_keys, config_keys_num);
281 plugin_register_write("amqp", amqp_write, NULL);
282 plugin_register_shutdown("amqp", shutdown);
283 }
285 /* vim: set sw=4 sts=4 et : */