1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2 
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdlib.h>
6 #include <sys/prctl.h>
7 #include <stdint.h>
8 
9 #include "sd-daemon.h"
10 
11 #include "af-list.h"
12 #include "alloc-util.h"
13 #include "def.h"
14 #include "errno-util.h"
15 #include "escape.h"
16 #include "fd-util.h"
17 #include "journal-remote-write.h"
18 #include "journal-remote.h"
19 #include "journald-native.h"
20 #include "macro.h"
21 #include "managed-journal-file.h"
22 #include "parse-util.h"
23 #include "process-util.h"
24 #include "socket-util.h"
25 #include "stdio-util.h"
26 #include "string-util.h"
27 #include "strv.h"
28 
29 #define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
30 
31 #define filename_escape(s) xescape((s), "/ ")
32 
open_output(RemoteServer * s,Writer * w,const char * host)33 static int open_output(RemoteServer *s, Writer *w, const char* host) {
34         _cleanup_free_ char *_filename = NULL;
35         const char *filename;
36         int r;
37 
38         switch (s->split_mode) {
39         case JOURNAL_WRITE_SPLIT_NONE:
40                 filename = s->output;
41                 break;
42 
43         case JOURNAL_WRITE_SPLIT_HOST: {
44                 _cleanup_free_ char *name = NULL;
45 
46                 assert(host);
47 
48                 name = filename_escape(host);
49                 if (!name)
50                         return log_oom();
51 
52                 r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
53                 if (r < 0)
54                         return log_oom();
55 
56                 filename = _filename;
57                 break;
58         }
59 
60         default:
61                 assert_not_reached();
62         }
63 
64         r = managed_journal_file_open_reliably(
65                         filename,
66                         O_RDWR|O_CREAT,
67                         s->file_flags,
68                         0640,
69                         UINT64_MAX,
70                         &w->metrics,
71                         w->mmap,
72                         NULL,
73                         NULL,
74                         &w->journal);
75         if (r < 0)
76                 return log_error_errno(r, "Failed to open output journal %s: %m", filename);
77 
78         log_debug("Opened output file %s", w->journal->file->path);
79         return 0;
80 }
81 
82 /**********************************************************************
83  **********************************************************************
84  **********************************************************************/
85 
init_writer_hashmap(RemoteServer * s)86 static int init_writer_hashmap(RemoteServer *s) {
87         static const struct hash_ops* const hash_ops[] = {
88                 [JOURNAL_WRITE_SPLIT_NONE] = NULL,
89                 [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
90         };
91 
92         assert(s);
93         assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
94 
95         s->writers = hashmap_new(hash_ops[s->split_mode]);
96         if (!s->writers)
97                 return log_oom();
98 
99         return 0;
100 }
101 
journal_remote_get_writer(RemoteServer * s,const char * host,Writer ** writer)102 int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
103         _cleanup_(writer_unrefp) Writer *w = NULL;
104         const void *key;
105         int r;
106 
107         switch (s->split_mode) {
108         case JOURNAL_WRITE_SPLIT_NONE:
109                 key = "one and only";
110                 break;
111 
112         case JOURNAL_WRITE_SPLIT_HOST:
113                 assert(host);
114                 key = host;
115                 break;
116 
117         default:
118                 assert_not_reached();
119         }
120 
121         w = hashmap_get(s->writers, key);
122         if (w)
123                 writer_ref(w);
124         else {
125                 w = writer_new(s);
126                 if (!w)
127                         return log_oom();
128 
129                 if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
130                         w->hashmap_key = strdup(key);
131                         if (!w->hashmap_key)
132                                 return log_oom();
133                 }
134 
135                 r = open_output(s, w, host);
136                 if (r < 0)
137                         return r;
138 
139                 r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
140                 if (r < 0)
141                         return r;
142         }
143 
144         *writer = TAKE_PTR(w);
145 
146         return 0;
147 }
148 
149 /**********************************************************************
150  **********************************************************************
151  **********************************************************************/
152 
153 /* This should go away as soon as µhttpd allows state to be passed around. */
154 RemoteServer *journal_remote_server_global;
155 
156 static int dispatch_raw_source_event(sd_event_source *event,
157                                      int fd,
158                                      uint32_t revents,
159                                      void *userdata);
160 static int dispatch_raw_source_until_block(sd_event_source *event,
161                                            void *userdata);
162 static int dispatch_blocking_source_event(sd_event_source *event,
163                                           void *userdata);
164 static int dispatch_raw_connection_event(sd_event_source *event,
165                                          int fd,
166                                          uint32_t revents,
167                                          void *userdata);
168 
get_source_for_fd(RemoteServer * s,int fd,char * name,RemoteSource ** source)169 static int get_source_for_fd(RemoteServer *s,
170                              int fd, char *name, RemoteSource **source) {
171         Writer *writer;
172         int r;
173 
174         /* This takes ownership of name, but only on success. */
175 
176         assert(fd >= 0);
177         assert(source);
178 
179         if (!GREEDY_REALLOC0(s->sources, fd + 1))
180                 return log_oom();
181 
182         r = journal_remote_get_writer(s, name, &writer);
183         if (r < 0)
184                 return log_warning_errno(r, "Failed to get writer for source %s: %m",
185                                          name);
186 
187         if (!s->sources[fd]) {
188                 s->sources[fd] = source_new(fd, false, name, writer);
189                 if (!s->sources[fd]) {
190                         writer_unref(writer);
191                         return log_oom();
192                 }
193 
194                 s->active++;
195         }
196 
197         *source = s->sources[fd];
198         return 0;
199 }
200 
remove_source(RemoteServer * s,int fd)201 static int remove_source(RemoteServer *s, int fd) {
202         RemoteSource *source;
203 
204         assert(s);
205         assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
206 
207         source = s->sources[fd];
208         if (source) {
209                 /* this closes fd too */
210                 source_free(source);
211                 s->sources[fd] = NULL;
212                 s->active--;
213         }
214 
215         return 0;
216 }
217 
journal_remote_add_source(RemoteServer * s,int fd,char * name,bool own_name)218 int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
219         RemoteSource *source = NULL;
220         int r;
221 
222         /* This takes ownership of name, even on failure, if own_name is true. */
223 
224         assert(s);
225         assert(fd >= 0);
226         assert(name);
227 
228         if (!own_name) {
229                 name = strdup(name);
230                 if (!name)
231                         return log_oom();
232         }
233 
234         r = get_source_for_fd(s, fd, name, &source);
235         if (r < 0) {
236                 log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
237                                 fd, name);
238                 free(name);
239                 return r;
240         }
241 
242         r = sd_event_add_io(s->events, &source->event,
243                             fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
244                             dispatch_raw_source_event, source);
245         if (r == 0) {
246                 /* Add additional source for buffer processing. It will be
247                  * enabled later. */
248                 r = sd_event_add_defer(s->events, &source->buffer_event,
249                                        dispatch_raw_source_until_block, source);
250                 if (r == 0)
251                         r = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
252         } else if (r == -EPERM) {
253                 log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
254                 r = sd_event_add_defer(s->events, &source->event,
255                                        dispatch_blocking_source_event, source);
256                 if (r == 0)
257                         r = sd_event_source_set_enabled(source->event, SD_EVENT_ON);
258         }
259         if (r < 0) {
260                 log_error_errno(r, "Failed to register event source for fd:%d: %m",
261                                 fd);
262                 goto error;
263         }
264 
265         r = sd_event_source_set_description(source->event, name);
266         if (r < 0) {
267                 log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
268                 goto error;
269         }
270 
271         return 1; /* work to do */
272 
273  error:
274         remove_source(s, fd);
275         return r;
276 }
277 
journal_remote_add_raw_socket(RemoteServer * s,int fd)278 int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
279         int r;
280         _unused_ _cleanup_close_ int fd_ = fd;
281         char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
282 
283         assert(fd >= 0);
284 
285         r = sd_event_add_io(s->events, &s->listen_event,
286                             fd, EPOLLIN,
287                             dispatch_raw_connection_event, s);
288         if (r < 0)
289                 return r;
290 
291         xsprintf(name, "raw-socket-%d", fd);
292 
293         r = sd_event_source_set_description(s->listen_event, name);
294         if (r < 0)
295                 return r;
296 
297         fd_ = -1;
298         s->active++;
299         return 0;
300 }
301 
302 /**********************************************************************
303  **********************************************************************
304  **********************************************************************/
305 
journal_remote_server_init(RemoteServer * s,const char * output,JournalWriteSplitMode split_mode,JournalFileFlags file_flags)306 int journal_remote_server_init(
307                 RemoteServer *s,
308                 const char *output,
309                 JournalWriteSplitMode split_mode,
310                 JournalFileFlags file_flags) {
311 
312         int r;
313 
314         assert(s);
315 
316         assert(journal_remote_server_global == NULL);
317         journal_remote_server_global = s;
318 
319         s->split_mode = split_mode;
320         s->file_flags = file_flags;
321 
322         if (output)
323                 s->output = output;
324         else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
325                 s->output = REMOTE_JOURNAL_PATH "/remote.journal";
326         else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
327                 s->output = REMOTE_JOURNAL_PATH;
328         else
329                 assert_not_reached();
330 
331         r = sd_event_default(&s->events);
332         if (r < 0)
333                 return log_error_errno(r, "Failed to allocate event loop: %m");
334 
335         r = init_writer_hashmap(s);
336         if (r < 0)
337                 return r;
338 
339         return 0;
340 }
341 
342 #if HAVE_MICROHTTPD
MHDDaemonWrapper_free(MHDDaemonWrapper * d)343 static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
344         MHD_stop_daemon(d->daemon);
345         sd_event_source_unref(d->io_event);
346         sd_event_source_unref(d->timer_event);
347         free(d);
348 }
349 #endif
350 
journal_remote_server_destroy(RemoteServer * s)351 void journal_remote_server_destroy(RemoteServer *s) {
352         size_t i;
353 
354 #if HAVE_MICROHTTPD
355         hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
356 #endif
357 
358         for (i = 0; i < MALLOC_ELEMENTSOF(s->sources); i++)
359                 remove_source(s, i);
360         free(s->sources);
361 
362         writer_unref(s->_single_writer);
363         hashmap_free(s->writers);
364 
365         sd_event_source_unref(s->sigterm_event);
366         sd_event_source_unref(s->sigint_event);
367         sd_event_source_unref(s->listen_event);
368         sd_event_unref(s->events);
369 
370         if (s == journal_remote_server_global)
371                 journal_remote_server_global = NULL;
372 
373         /* fds that we're listening on remain open... */
374 }
375 
376 /**********************************************************************
377  **********************************************************************
378  **********************************************************************/
379 
journal_remote_handle_raw_source(sd_event_source * event,int fd,uint32_t revents,RemoteServer * s)380 int journal_remote_handle_raw_source(
381                 sd_event_source *event,
382                 int fd,
383                 uint32_t revents,
384                 RemoteServer *s) {
385 
386         RemoteSource *source;
387         int r;
388 
389         /* Returns 1 if there might be more data pending,
390          * 0 if data is currently exhausted, negative on error.
391          */
392 
393         assert(fd >= 0 && fd < (ssize_t) MALLOC_ELEMENTSOF(s->sources));
394         source = s->sources[fd];
395         assert(source->importer.fd == fd);
396 
397         r = process_source(source, s->file_flags);
398         if (journal_importer_eof(&source->importer)) {
399                 size_t remaining;
400 
401                 log_debug("EOF reached with source %s (fd=%d)",
402                           source->importer.name, source->importer.fd);
403 
404                 remaining = journal_importer_bytes_remaining(&source->importer);
405                 if (remaining > 0)
406                         log_notice("Premature EOF. %zu bytes lost.", remaining);
407                 remove_source(s, source->importer.fd);
408                 log_debug("%zu active sources remaining", s->active);
409                 return 0;
410         } else if (r == -E2BIG) {
411                 log_notice("Entry with too many fields, skipped");
412                 return 1;
413         } else if (r == -ENOBUFS) {
414                 log_notice("Entry too big, skipped");
415                 return 1;
416         } else if (r == -EAGAIN) {
417                 return 0;
418         } else if (r < 0) {
419                 log_debug_errno(r, "Closing connection: %m");
420                 remove_source(s, fd);
421                 return 0;
422         } else
423                 return 1;
424 }
425 
dispatch_raw_source_until_block(sd_event_source * event,void * userdata)426 static int dispatch_raw_source_until_block(sd_event_source *event,
427                                            void *userdata) {
428         RemoteSource *source = userdata;
429         int r;
430 
431         /* Make sure event stays around even if source is destroyed */
432         sd_event_source_ref(event);
433 
434         r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
435         if (r != 1) {
436                 int k;
437 
438                 /* No more data for now */
439                 k = sd_event_source_set_enabled(event, SD_EVENT_OFF);
440                 if (k < 0)
441                         r = k;
442         }
443 
444         sd_event_source_unref(event);
445 
446         return r;
447 }
448 
dispatch_raw_source_event(sd_event_source * event,int fd,uint32_t revents,void * userdata)449 static int dispatch_raw_source_event(sd_event_source *event,
450                                      int fd,
451                                      uint32_t revents,
452                                      void *userdata) {
453         RemoteSource *source = userdata;
454         int r;
455 
456         assert(source->event);
457         assert(source->buffer_event);
458 
459         r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
460         if (r == 1) {
461                 int k;
462 
463                 /* Might have more data. We need to rerun the handler
464                  * until we are sure the buffer is exhausted. */
465                 k = sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
466                 if (k < 0)
467                         r = k;
468         }
469 
470         return r;
471 }
472 
dispatch_blocking_source_event(sd_event_source * event,void * userdata)473 static int dispatch_blocking_source_event(sd_event_source *event,
474                                           void *userdata) {
475         RemoteSource *source = userdata;
476 
477         return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
478 }
479 
accept_connection(const char * type,int fd,SocketAddress * addr,char ** hostname)480 static int accept_connection(
481                 const char* type,
482                 int fd,
483                 SocketAddress *addr,
484                 char **hostname) {
485 
486         _cleanup_close_ int fd2 = -1;
487         int r;
488 
489         log_debug("Accepting new %s connection on fd:%d", type, fd);
490         fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
491         if (fd2 < 0) {
492                 if (ERRNO_IS_ACCEPT_AGAIN(errno))
493                         return -EAGAIN;
494 
495                 return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
496         }
497 
498         switch (socket_address_family(addr)) {
499         case AF_INET:
500         case AF_INET6: {
501                 _cleanup_free_ char *a = NULL;
502                 char *b;
503 
504                 r = socket_address_print(addr, &a);
505                 if (r < 0)
506                         return log_error_errno(r, "socket_address_print(): %m");
507 
508                 r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
509                 if (r < 0)
510                         return log_error_errno(r, "Resolving hostname failed: %m");
511 
512                 log_debug("Accepted %s %s connection from %s",
513                           type,
514                           af_to_ipv4_ipv6(socket_address_family(addr)),
515                           a);
516 
517                 *hostname = b;
518                 return TAKE_FD(fd2);
519         }
520 
521         default:
522                 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
523                                        "Rejected %s connection with unsupported family %d",
524                                        type, socket_address_family(addr));
525         }
526 }
527 
dispatch_raw_connection_event(sd_event_source * event,int fd,uint32_t revents,void * userdata)528 static int dispatch_raw_connection_event(
529                 sd_event_source *event,
530                 int fd,
531                 uint32_t revents,
532                 void *userdata) {
533 
534         RemoteServer *s = userdata;
535         int fd2;
536         SocketAddress addr = {
537                 .size = sizeof(union sockaddr_union),
538                 .type = SOCK_STREAM,
539         };
540         char *hostname = NULL;
541 
542         fd2 = accept_connection("raw", fd, &addr, &hostname);
543         if (fd2 == -EAGAIN)
544                 return 0;
545         if (fd2 < 0)
546                 return fd2;
547 
548         return journal_remote_add_source(s, fd2, hostname, true);
549 }
550