1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdlib.h>
6 #include <sys/prctl.h>
7 #include <stdint.h>
8
9 #include "sd-daemon.h"
10
11 #include "af-list.h"
12 #include "alloc-util.h"
13 #include "def.h"
14 #include "errno-util.h"
15 #include "escape.h"
16 #include "fd-util.h"
17 #include "journal-remote-write.h"
18 #include "journal-remote.h"
19 #include "journald-native.h"
20 #include "macro.h"
21 #include "managed-journal-file.h"
22 #include "parse-util.h"
23 #include "process-util.h"
24 #include "socket-util.h"
25 #include "stdio-util.h"
26 #include "string-util.h"
27 #include "strv.h"
28
29 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
30
31 #define filename_escape(s) xescape((s), "/ ")
32
open_output(RemoteServer * s,Writer * w,const char * host)33 static int open_output(RemoteServer *s, Writer *w, const char* host) {
34 _cleanup_free_ char *_filename = NULL;
35 const char *filename;
36 int r;
37
38 switch (s->split_mode) {
39 case JOURNAL_WRITE_SPLIT_NONE:
40 filename = s->output;
41 break;
42
43 case JOURNAL_WRITE_SPLIT_HOST: {
44 _cleanup_free_ char *name = NULL;
45
46 assert(host);
47
48 name = filename_escape(host);
49 if (!name)
50 return log_oom();
51
52 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
53 if (r < 0)
54 return log_oom();
55
56 filename = _filename;
57 break;
58 }
59
60 default:
61 assert_not_reached();
62 }
63
64 r = managed_journal_file_open_reliably(
65 filename,
66 O_RDWR|O_CREAT,
67 s->file_flags,
68 0640,
69 UINT64_MAX,
70 &w->metrics,
71 w->mmap,
72 NULL,
73 NULL,
74 &w->journal);
75 if (r < 0)
76 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
77
78 log_debug("Opened output file %s", w->journal->file->path);
79 return 0;
80 }
81
82 /**********************************************************************
83 **********************************************************************
84 **********************************************************************/
85
init_writer_hashmap(RemoteServer * s)86 static int init_writer_hashmap(RemoteServer *s) {
87 static const struct hash_ops* const hash_ops[] = {
88 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
89 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
90 };
91
92 assert(s);
93 assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
94
95 s->writers = hashmap_new(hash_ops[s->split_mode]);
96 if (!s->writers)
97 return log_oom();
98
99 return 0;
100 }
101
journal_remote_get_writer(RemoteServer * s,const char * host,Writer ** writer)102 int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
103 _cleanup_(writer_unrefp) Writer *w = NULL;
104 const void *key;
105 int r;
106
107 switch (s->split_mode) {
108 case JOURNAL_WRITE_SPLIT_NONE:
109 key = "one and only";
110 break;
111
112 case JOURNAL_WRITE_SPLIT_HOST:
113 assert(host);
114 key = host;
115 break;
116
117 default:
118 assert_not_reached();
119 }
120
121 w = hashmap_get(s->writers, key);
122 if (w)
123 writer_ref(w);
124 else {
125 w = writer_new(s);
126 if (!w)
127 return log_oom();
128
129 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
130 w->hashmap_key = strdup(key);
131 if (!w->hashmap_key)
132 return log_oom();
133 }
134
135 r = open_output(s, w, host);
136 if (r < 0)
137 return r;
138
139 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
140 if (r < 0)
141 return r;
142 }
143
144 *writer = TAKE_PTR(w);
145
146 return 0;
147 }
148
149 /**********************************************************************
150 **********************************************************************
151 **********************************************************************/
152
153 /* This should go away as soon as µhttpd allows state to be passed around. */
154 RemoteServer *journal_remote_server_global;
155
156 static int dispatch_raw_source_event(sd_event_source *event,
157 int fd,
158 uint32_t revents,
159 void *userdata);
160 static int dispatch_raw_source_until_block(sd_event_source *event,
161 void *userdata);
162 static int dispatch_blocking_source_event(sd_event_source *event,
163 void *userdata);
164 static int dispatch_raw_connection_event(sd_event_source *event,
165 int fd,
166 uint32_t revents,
167 void *userdata);
168
get_source_for_fd(RemoteServer * s,int fd,char * name,RemoteSource ** source)169 static int get_source_for_fd(RemoteServer *s,
170 int fd, char *name, RemoteSource **source) {
171 Writer *writer;
172 int r;
173
174 /* This takes ownership of name, but only on success. */
175
176 assert(fd >= 0);
177 assert(source);
178
179 if (!GREEDY_REALLOC0(s->sources, fd + 1))
180 return log_oom();
181
182 r = journal_remote_get_writer(s, name, &writer);
183 if (r < 0)
184 return log_warning_errno(r, "Failed to get writer for source %s: %m",
185 name);
186
187 if (!s->sources[fd]) {
188 s->sources[fd] = source_new(fd, false, name, writer);
189 if (!s->sources[fd]) {
190 writer_unref(writer);
191 return log_oom();
192 }
193
194 s->active++;
195 }
196
197 *source = s->sources[fd];
198 return 0;
199 }
200
remove_source(RemoteServer * s,int fd)201 static int remove_source(RemoteServer *s, int fd) {
202 RemoteSource *source;
203
204 assert(s);
205 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
206
207 source = s->sources[fd];
208 if (source) {
209 /* this closes fd too */
210 source_free(source);
211 s->sources[fd] = NULL;
212 s->active--;
213 }
214
215 return 0;
216 }
217
journal_remote_add_source(RemoteServer * s,int fd,char * name,bool own_name)218 int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
219 RemoteSource *source = NULL;
220 int r;
221
222 /* This takes ownership of name, even on failure, if own_name is true. */
223
224 assert(s);
225 assert(fd >= 0);
226 assert(name);
227
228 if (!own_name) {
229 name = strdup(name);
230 if (!name)
231 return log_oom();
232 }
233
234 r = get_source_for_fd(s, fd, name, &source);
235 if (r < 0) {
236 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
237 fd, name);
238 free(name);
239 return r;
240 }
241
242 r = sd_event_add_io(s->events, &source->event,
243 fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
244 dispatch_raw_source_event, source);
245 if (r == 0) {
246 /* Add additional source for buffer processing. It will be
247 * enabled later. */
248 r = sd_event_add_defer(s->events, &source->buffer_event,
249 dispatch_raw_source_until_block, source);
250 if (r == 0)
251 r = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
252 } else if (r == -EPERM) {
253 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
254 r = sd_event_add_defer(s->events, &source->event,
255 dispatch_blocking_source_event, source);
256 if (r == 0)
257 r = sd_event_source_set_enabled(source->event, SD_EVENT_ON);
258 }
259 if (r < 0) {
260 log_error_errno(r, "Failed to register event source for fd:%d: %m",
261 fd);
262 goto error;
263 }
264
265 r = sd_event_source_set_description(source->event, name);
266 if (r < 0) {
267 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
268 goto error;
269 }
270
271 return 1; /* work to do */
272
273 error:
274 remove_source(s, fd);
275 return r;
276 }
277
journal_remote_add_raw_socket(RemoteServer * s,int fd)278 int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
279 int r;
280 _unused_ _cleanup_close_ int fd_ = fd;
281 char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
282
283 assert(fd >= 0);
284
285 r = sd_event_add_io(s->events, &s->listen_event,
286 fd, EPOLLIN,
287 dispatch_raw_connection_event, s);
288 if (r < 0)
289 return r;
290
291 xsprintf(name, "raw-socket-%d", fd);
292
293 r = sd_event_source_set_description(s->listen_event, name);
294 if (r < 0)
295 return r;
296
297 fd_ = -1;
298 s->active++;
299 return 0;
300 }
301
302 /**********************************************************************
303 **********************************************************************
304 **********************************************************************/
305
journal_remote_server_init(RemoteServer * s,const char * output,JournalWriteSplitMode split_mode,JournalFileFlags file_flags)306 int journal_remote_server_init(
307 RemoteServer *s,
308 const char *output,
309 JournalWriteSplitMode split_mode,
310 JournalFileFlags file_flags) {
311
312 int r;
313
314 assert(s);
315
316 assert(journal_remote_server_global == NULL);
317 journal_remote_server_global = s;
318
319 s->split_mode = split_mode;
320 s->file_flags = file_flags;
321
322 if (output)
323 s->output = output;
324 else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
325 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
326 else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
327 s->output = REMOTE_JOURNAL_PATH;
328 else
329 assert_not_reached();
330
331 r = sd_event_default(&s->events);
332 if (r < 0)
333 return log_error_errno(r, "Failed to allocate event loop: %m");
334
335 r = init_writer_hashmap(s);
336 if (r < 0)
337 return r;
338
339 return 0;
340 }
341
342 #if HAVE_MICROHTTPD
MHDDaemonWrapper_free(MHDDaemonWrapper * d)343 static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
344 MHD_stop_daemon(d->daemon);
345 sd_event_source_unref(d->io_event);
346 sd_event_source_unref(d->timer_event);
347 free(d);
348 }
349 #endif
350
journal_remote_server_destroy(RemoteServer * s)351 void journal_remote_server_destroy(RemoteServer *s) {
352 size_t i;
353
354 #if HAVE_MICROHTTPD
355 hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
356 #endif
357
358 for (i = 0; i < MALLOC_ELEMENTSOF(s->sources); i++)
359 remove_source(s, i);
360 free(s->sources);
361
362 writer_unref(s->_single_writer);
363 hashmap_free(s->writers);
364
365 sd_event_source_unref(s->sigterm_event);
366 sd_event_source_unref(s->sigint_event);
367 sd_event_source_unref(s->listen_event);
368 sd_event_unref(s->events);
369
370 if (s == journal_remote_server_global)
371 journal_remote_server_global = NULL;
372
373 /* fds that we're listening on remain open... */
374 }
375
376 /**********************************************************************
377 **********************************************************************
378 **********************************************************************/
379
journal_remote_handle_raw_source(sd_event_source * event,int fd,uint32_t revents,RemoteServer * s)380 int journal_remote_handle_raw_source(
381 sd_event_source *event,
382 int fd,
383 uint32_t revents,
384 RemoteServer *s) {
385
386 RemoteSource *source;
387 int r;
388
389 /* Returns 1 if there might be more data pending,
390 * 0 if data is currently exhausted, negative on error.
391 */
392
393 assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
394 source = s->sources[fd];
395 assert(source->importer.fd == fd);
396
397 r = process_source(source, s->file_flags);
398 if (journal_importer_eof(&source->importer)) {
399 size_t remaining;
400
401 log_debug("EOF reached with source %s (fd=%d)",
402 source->importer.name, source->importer.fd);
403
404 remaining = journal_importer_bytes_remaining(&source->importer);
405 if (remaining > 0)
406 log_notice("Premature EOF. %zu bytes lost.", remaining);
407 remove_source(s, source->importer.fd);
408 log_debug("%zu active sources remaining", s->active);
409 return 0;
410 } else if (r == -E2BIG) {
411 log_notice("Entry with too many fields, skipped");
412 return 1;
413 } else if (r == -ENOBUFS) {
414 log_notice("Entry too big, skipped");
415 return 1;
416 } else if (r == -EAGAIN) {
417 return 0;
418 } else if (r < 0) {
419 log_debug_errno(r, "Closing connection: %m");
420 remove_source(s, fd);
421 return 0;
422 } else
423 return 1;
424 }
425
dispatch_raw_source_until_block(sd_event_source * event,void * userdata)426 static int dispatch_raw_source_until_block(sd_event_source *event,
427 void *userdata) {
428 RemoteSource *source = userdata;
429 int r;
430
431 /* Make sure event stays around even if source is destroyed */
432 sd_event_source_ref(event);
433
434 r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
435 if (r != 1) {
436 int k;
437
438 /* No more data for now */
439 k = sd_event_source_set_enabled(event, SD_EVENT_OFF);
440 if (k < 0)
441 r = k;
442 }
443
444 sd_event_source_unref(event);
445
446 return r;
447 }
448
dispatch_raw_source_event(sd_event_source * event,int fd,uint32_t revents,void * userdata)449 static int dispatch_raw_source_event(sd_event_source *event,
450 int fd,
451 uint32_t revents,
452 void *userdata) {
453 RemoteSource *source = userdata;
454 int r;
455
456 assert(source->event);
457 assert(source->buffer_event);
458
459 r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
460 if (r == 1) {
461 int k;
462
463 /* Might have more data. We need to rerun the handler
464 * until we are sure the buffer is exhausted. */
465 k = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
466 if (k < 0)
467 r = k;
468 }
469
470 return r;
471 }
472
dispatch_blocking_source_event(sd_event_source * event,void * userdata)473 static int dispatch_blocking_source_event(sd_event_source *event,
474 void *userdata) {
475 RemoteSource *source = userdata;
476
477 return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
478 }
479
accept_connection(const char * type,int fd,SocketAddress * addr,char ** hostname)480 static int accept_connection(
481 const char* type,
482 int fd,
483 SocketAddress *addr,
484 char **hostname) {
485
486 _cleanup_close_ int fd2 = -1;
487 int r;
488
489 log_debug("Accepting new %s connection on fd:%d", type, fd);
490 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
491 if (fd2 < 0) {
492 if (ERRNO_IS_ACCEPT_AGAIN(errno))
493 return -EAGAIN;
494
495 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
496 }
497
498 switch (socket_address_family(addr)) {
499 case AF_INET:
500 case AF_INET6: {
501 _cleanup_free_ char *a = NULL;
502 char *b;
503
504 r = socket_address_print(addr, &a);
505 if (r < 0)
506 return log_error_errno(r, "socket_address_print(): %m");
507
508 r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
509 if (r < 0)
510 return log_error_errno(r, "Resolving hostname failed: %m");
511
512 log_debug("Accepted %s %s connection from %s",
513 type,
514 af_to_ipv4_ipv6(socket_address_family(addr)),
515 a);
516
517 *hostname = b;
518 return TAKE_FD(fd2);
519 }
520
521 default:
522 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
523 "Rejected %s connection with unsupported family %d",
524 type, socket_address_family(addr));
525 }
526 }
527
dispatch_raw_connection_event(sd_event_source * event,int fd,uint32_t revents,void * userdata)528 static int dispatch_raw_connection_event(
529 sd_event_source *event,
530 int fd,
531 uint32_t revents,
532 void *userdata) {
533
534 RemoteServer *s = userdata;
535 int fd2;
536 SocketAddress addr = {
537 .size = sizeof(union sockaddr_union),
538 .type = SOCK_STREAM,
539 };
540 char *hostname = NULL;
541
542 fd2 = accept_connection("raw", fd, &addr, &hostname);
543 if (fd2 == -EAGAIN)
544 return 0;
545 if (fd2 < 0)
546 return fd2;
547
548 return journal_remote_add_source(s, fd2, hostname, true);
549 }
550