1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2 
3 #include "alloc-util.h"
4 #include "fd-util.h"
5 #include "journal-remote-parse.h"
6 #include "journald-native.h"
7 #include "parse-util.h"
8 #include "string-util.h"
9 
source_free(RemoteSource * source)10 void source_free(RemoteSource *source) {
11         if (!source)
12                 return;
13 
14         journal_importer_cleanup(&source->importer);
15 
16         log_debug("Writer ref count %i", source->writer->n_ref);
17         writer_unref(source->writer);
18 
19         sd_event_source_unref(source->event);
20         sd_event_source_unref(source->buffer_event);
21 
22         free(source);
23 }
24 
25 /**
26  * Initialize zero-filled source with given values. On success, takes
27  * ownership of fd, name, and writer, otherwise does not touch them.
28  */
source_new(int fd,bool passive_fd,char * name,Writer * writer)29 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
30         RemoteSource *source;
31 
32         log_debug("Creating source for %sfd:%d (%s)",
33                   passive_fd ? "passive " : "", fd, name);
34 
35         assert(fd >= 0);
36 
37         source = new0(RemoteSource, 1);
38         if (!source)
39                 return NULL;
40 
41         source->importer = JOURNAL_IMPORTER_MAKE(fd);
42         source->importer.passive_fd = passive_fd;
43         source->importer.name = name;
44 
45         source->writer = writer;
46 
47         return source;
48 }
49 
process_source(RemoteSource * source,JournalFileFlags file_flags)50 int process_source(RemoteSource *source, JournalFileFlags file_flags) {
51         int r;
52 
53         assert(source);
54         assert(source->writer);
55 
56         r = journal_importer_process_data(&source->importer);
57         if (r <= 0)
58                 return r;
59 
60         /* We have a full event */
61         log_trace("Received full event from source@%p fd:%d (%s)",
62                   source, source->importer.fd, source->importer.name);
63 
64         if (source->importer.iovw.count == 0) {
65                 log_warning("Entry with no payload, skipping");
66                 goto freeing;
67         }
68 
69         assert(source->importer.iovw.iovec);
70 
71         r = writer_write(source->writer,
72                          &source->importer.iovw,
73                          &source->importer.ts,
74                          &source->importer.boot_id,
75                          file_flags);
76         if (r == -EBADMSG) {
77                 log_warning_errno(r, "Entry is invalid, ignoring.");
78                 r = 0;
79         } else if (r < 0)
80                 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
81                                 iovw_size(&source->importer.iovw));
82         else
83                 r = 1;
84 
85  freeing:
86         journal_importer_drop_iovw(&source->importer);
87         return r;
88 }
89