1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2 
3 #include <errno.h>
4 #include <malloc.h>
5 #include <unistd.h>
6 
7 #include "alloc-util.h"
8 #include "errno-util.h"
9 #include "escape.h"
10 #include "fd-util.h"
11 #include "io-util.h"
12 #include "journal-file.h"
13 #include "journal-importer.h"
14 #include "journal-util.h"
15 #include "parse-util.h"
16 #include "string-util.h"
17 #include "unaligned.h"
18 
19 enum {
20         IMPORTER_STATE_LINE = 0,    /* waiting to read, or reading line */
21         IMPORTER_STATE_DATA_START,  /* reading binary data header */
22         IMPORTER_STATE_DATA,        /* reading binary data */
23         IMPORTER_STATE_DATA_FINISH, /* expecting newline */
24         IMPORTER_STATE_EOF,         /* done */
25 };
26 
journal_importer_cleanup(JournalImporter * imp)27 void journal_importer_cleanup(JournalImporter *imp) {
28         if (imp->fd >= 0 && !imp->passive_fd) {
29                 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
30                 safe_close(imp->fd);
31         }
32 
33         free(imp->name);
34         free(imp->buf);
35         iovw_free_contents(&imp->iovw, false);
36 }
37 
realloc_buffer(JournalImporter * imp,size_t size)38 static char* realloc_buffer(JournalImporter *imp, size_t size) {
39         char *b, *old = imp->buf;
40 
41         b = GREEDY_REALLOC(imp->buf, size);
42         if (!b)
43                 return NULL;
44 
45         iovw_rebase(&imp->iovw, old, imp->buf);
46 
47         return b;
48 }
49 
get_line(JournalImporter * imp,char ** line,size_t * size)50 static int get_line(JournalImporter *imp, char **line, size_t *size) {
51         ssize_t n;
52         char *c = NULL;
53 
54         assert(imp);
55         assert(imp->state == IMPORTER_STATE_LINE);
56         assert(imp->offset <= imp->filled);
57         assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
58         assert(imp->fd >= 0);
59 
60         for (;;) {
61                 if (imp->buf) {
62                         size_t start = MAX(imp->scanned, imp->offset);
63 
64                         c = memchr(imp->buf + start, '\n',
65                                    imp->filled - start);
66                         if (c)
67                                 break;
68                 }
69 
70                 imp->scanned = imp->filled;
71                 if (imp->scanned >= DATA_SIZE_MAX)
72                         return log_warning_errno(SYNTHETIC_ERRNO(ENOBUFS),
73                                                  "Entry is bigger than %u bytes.",
74                                                  DATA_SIZE_MAX);
75 
76                 if (imp->passive_fd)
77                         /* we have to wait for some data to come to us */
78                         return -EAGAIN;
79 
80                 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
81                    we reallocate it, we'll increase the size at least a bit. */
82                 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
83                 if (MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled < LINE_CHUNK &&
84                     !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
85                                 return log_oom();
86 
87                 assert(imp->buf);
88                 assert(MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled >= LINE_CHUNK ||
89                        MALLOC_SIZEOF_SAFE(imp->buf) >= ENTRY_SIZE_MAX);
90 
91                 n = read(imp->fd,
92                          imp->buf + imp->filled,
93                          MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
94                 if (n < 0) {
95                         if (errno != EAGAIN)
96                                 log_error_errno(errno, "read(%d, ..., %zu): %m",
97                                                 imp->fd,
98                                                 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
99                         return -errno;
100                 } else if (n == 0)
101                         return 0;
102 
103                 imp->filled += n;
104         }
105 
106         *line = imp->buf + imp->offset;
107         *size = c + 1 - imp->buf - imp->offset;
108         imp->offset += *size;
109 
110         return 1;
111 }
112 
fill_fixed_size(JournalImporter * imp,void ** data,size_t size)113 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
114 
115         assert(imp);
116         assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
117         assert(size <= DATA_SIZE_MAX);
118         assert(imp->offset <= imp->filled);
119         assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
120         assert(imp->fd >= 0);
121         assert(data);
122 
123         while (imp->filled - imp->offset < size) {
124                 int n;
125 
126                 if (imp->passive_fd)
127                         /* we have to wait for some data to come to us */
128                         return -EAGAIN;
129 
130                 if (!realloc_buffer(imp, imp->offset + size))
131                         return log_oom();
132 
133                 n = read(imp->fd, imp->buf + imp->filled,
134                          MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
135                 if (n < 0) {
136                         if (errno != EAGAIN)
137                                 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
138                                                 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
139                         return -errno;
140                 } else if (n == 0)
141                         return 0;
142 
143                 imp->filled += n;
144         }
145 
146         *data = imp->buf + imp->offset;
147         imp->offset += size;
148 
149         return 1;
150 }
151 
get_data_size(JournalImporter * imp)152 static int get_data_size(JournalImporter *imp) {
153         int r;
154         void *data;
155 
156         assert(imp);
157         assert(imp->state == IMPORTER_STATE_DATA_START);
158         assert(imp->data_size == 0);
159 
160         r = fill_fixed_size(imp, &data, sizeof(uint64_t));
161         if (r <= 0)
162                 return r;
163 
164         imp->data_size = unaligned_read_le64(data);
165         if (imp->data_size > DATA_SIZE_MAX)
166                 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
167                                          "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
168                                          imp->data_size, DATA_SIZE_MAX);
169         if (imp->data_size == 0)
170                 log_warning("Binary field with zero length");
171 
172         return 1;
173 }
174 
get_data_data(JournalImporter * imp,void ** data)175 static int get_data_data(JournalImporter *imp, void **data) {
176         int r;
177 
178         assert(imp);
179         assert(data);
180         assert(imp->state == IMPORTER_STATE_DATA);
181 
182         r = fill_fixed_size(imp, data, imp->data_size);
183         if (r <= 0)
184                 return r;
185 
186         return 1;
187 }
188 
get_data_newline(JournalImporter * imp)189 static int get_data_newline(JournalImporter *imp) {
190         int r;
191         char *data;
192 
193         assert(imp);
194         assert(imp->state == IMPORTER_STATE_DATA_FINISH);
195 
196         r = fill_fixed_size(imp, (void**) &data, 1);
197         if (r <= 0)
198                 return r;
199 
200         assert(data);
201         if (*data != '\n') {
202                 char buf[4];
203                 int l;
204 
205                 l = cescape_char(*data, buf);
206                 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
207                                          "Expected newline, got '%.*s'", l, buf);
208         }
209 
210         return 1;
211 }
212 
process_special_field(JournalImporter * imp,char * line)213 static int process_special_field(JournalImporter *imp, char *line) {
214         const char *value;
215         char buf[CELLESCAPE_DEFAULT_LENGTH];
216         int r;
217 
218         assert(line);
219 
220         value = startswith(line, "__CURSOR=");
221         if (value)
222                 /* ignore __CURSOR */
223                 return 1;
224 
225         value = startswith(line, "__REALTIME_TIMESTAMP=");
226         if (value) {
227                 uint64_t x;
228 
229                 r = safe_atou64(value, &x);
230                 if (r < 0)
231                         return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
232                                                  cellescape(buf, sizeof buf, value));
233                 else if (!VALID_REALTIME(x)) {
234                         log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
235                         return -ERANGE;
236                 }
237 
238                 imp->ts.realtime = x;
239                 return 1;
240         }
241 
242         value = startswith(line, "__MONOTONIC_TIMESTAMP=");
243         if (value) {
244                 uint64_t x;
245 
246                 r = safe_atou64(value, &x);
247                 if (r < 0)
248                         return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
249                                                  cellescape(buf, sizeof buf, value));
250                 else if (!VALID_MONOTONIC(x)) {
251                         log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
252                         return -ERANGE;
253                 }
254 
255                 imp->ts.monotonic = x;
256                 return 1;
257         }
258 
259         /* Just a single underline, but it needs special treatment too. */
260         value = startswith(line, "_BOOT_ID=");
261         if (value) {
262                 r = sd_id128_from_string(value, &imp->boot_id);
263                 if (r < 0)
264                         return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
265                                                  cellescape(buf, sizeof buf, value));
266 
267                 /* store the field in the usual fashion too */
268                 return 0;
269         }
270 
271         value = startswith(line, "__");
272         if (value) {
273                 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
274                 return 1;
275         }
276 
277         /* no dunder */
278         return 0;
279 }
280 
journal_importer_process_data(JournalImporter * imp)281 int journal_importer_process_data(JournalImporter *imp) {
282         int r;
283 
284         switch (imp->state) {
285         case IMPORTER_STATE_LINE: {
286                 char *line, *sep;
287                 size_t n = 0;
288 
289                 assert(imp->data_size == 0);
290 
291                 r = get_line(imp, &line, &n);
292                 if (r < 0)
293                         return r;
294                 if (r == 0) {
295                         imp->state = IMPORTER_STATE_EOF;
296                         return 0;
297                 }
298                 assert(n > 0);
299                 assert(line[n-1] == '\n');
300 
301                 if (n == 1) {
302                         log_trace("Received empty line, event is ready");
303                         return 1;
304                 }
305 
306                 /* MESSAGE=xxx\n
307                    or
308                    COREDUMP\n
309                    LLLLLLLL0011223344...\n
310                 */
311                 sep = memchr(line, '=', n);
312                 if (sep) {
313                         /* chomp newline */
314                         n--;
315 
316                         if (!journal_field_valid(line, sep - line, true)) {
317                                 char buf[64], *t;
318 
319                                 t = strndupa_safe(line, sep - line);
320                                 log_debug("Ignoring invalid field: \"%s\"",
321                                           cellescape(buf, sizeof buf, t));
322 
323                                 return 0;
324                         }
325 
326                         line[n] = '\0';
327                         r = process_special_field(imp, line);
328                         if (r != 0)
329                                 return r < 0 ? r : 0;
330 
331                         r = iovw_put(&imp->iovw, line, n);
332                         if (r < 0)
333                                 return r;
334                 } else {
335                         if (!journal_field_valid(line, n - 1, true)) {
336                                 char buf[64], *t;
337 
338                                 t = strndupa_safe(line, n - 1);
339                                 log_debug("Ignoring invalid field: \"%s\"",
340                                           cellescape(buf, sizeof buf, t));
341 
342                                 return 0;
343                         }
344 
345                         /* replace \n with = */
346                         line[n-1] = '=';
347 
348                         imp->field_len = n;
349                         imp->state = IMPORTER_STATE_DATA_START;
350 
351                         /* we cannot put the field in iovec until we have all data */
352                 }
353 
354                 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
355 
356                 return 0; /* continue */
357         }
358 
359         case IMPORTER_STATE_DATA_START:
360                 assert(imp->data_size == 0);
361 
362                 r = get_data_size(imp);
363                 // log_debug("get_data_size() -> %d", r);
364                 if (r < 0)
365                         return r;
366                 if (r == 0) {
367                         imp->state = IMPORTER_STATE_EOF;
368                         return 0;
369                 }
370 
371                 imp->state = imp->data_size > 0 ?
372                         IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
373 
374                 return 0; /* continue */
375 
376         case IMPORTER_STATE_DATA: {
377                 void *data;
378                 char *field;
379 
380                 assert(imp->data_size > 0);
381 
382                 r = get_data_data(imp, &data);
383                 // log_debug("get_data_data() -> %d", r);
384                 if (r < 0)
385                         return r;
386                 if (r == 0) {
387                         imp->state = IMPORTER_STATE_EOF;
388                         return 0;
389                 }
390 
391                 assert(data);
392 
393                 field = (char*) data - sizeof(uint64_t) - imp->field_len;
394                 memmove(field + sizeof(uint64_t), field, imp->field_len);
395 
396                 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
397                 if (r < 0)
398                         return r;
399 
400                 imp->state = IMPORTER_STATE_DATA_FINISH;
401 
402                 return 0; /* continue */
403         }
404 
405         case IMPORTER_STATE_DATA_FINISH:
406                 r = get_data_newline(imp);
407                 // log_debug("get_data_newline() -> %d", r);
408                 if (r < 0)
409                         return r;
410                 if (r == 0) {
411                         imp->state = IMPORTER_STATE_EOF;
412                         return 0;
413                 }
414 
415                 imp->data_size = 0;
416                 imp->state = IMPORTER_STATE_LINE;
417 
418                 return 0; /* continue */
419         default:
420                 assert_not_reached();
421         }
422 }
423 
journal_importer_push_data(JournalImporter * imp,const char * data,size_t size)424 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
425         assert(imp);
426         assert(imp->state != IMPORTER_STATE_EOF);
427 
428         if (!realloc_buffer(imp, imp->filled + size))
429                 return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
430                                        "Failed to store received data of size %zu "
431                                        "(in addition to existing %zu bytes with %zu filled): %s",
432                                        size, MALLOC_SIZEOF_SAFE(imp->buf), imp->filled,
433                                        strerror_safe(ENOMEM));
434 
435         memcpy(imp->buf + imp->filled, data, size);
436         imp->filled += size;
437 
438         return 0;
439 }
440 
journal_importer_drop_iovw(JournalImporter * imp)441 void journal_importer_drop_iovw(JournalImporter *imp) {
442         size_t remain, target;
443 
444         /* This function drops processed data that along with the iovw that points at it */
445 
446         iovw_free_contents(&imp->iovw, false);
447 
448         /* possibly reset buffer position */
449         remain = imp->filled - imp->offset;
450 
451         if (remain == 0) /* no brainer */
452                 imp->offset = imp->scanned = imp->filled = 0;
453         else if (imp->offset > MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled &&
454                  imp->offset > remain) {
455                 memcpy(imp->buf, imp->buf + imp->offset, remain);
456                 imp->offset = imp->scanned = 0;
457                 imp->filled = remain;
458         }
459 
460         target = MALLOC_SIZEOF_SAFE(imp->buf);
461         while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
462                 target /= 2;
463         if (target < MALLOC_SIZEOF_SAFE(imp->buf)) {
464                 char *tmp;
465                 size_t old_size;
466 
467                 old_size = MALLOC_SIZEOF_SAFE(imp->buf);
468 
469                 tmp = realloc(imp->buf, target);
470                 if (!tmp)
471                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
472                                     target);
473                 else {
474                         log_debug("Reallocated buffer from %zu to %zu bytes",
475                                   old_size, target);
476                         imp->buf = tmp;
477                 }
478         }
479 }
480 
journal_importer_eof(const JournalImporter * imp)481 bool journal_importer_eof(const JournalImporter *imp) {
482         return imp->state == IMPORTER_STATE_EOF;
483 }
484