1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <malloc.h>
5 #include <unistd.h>
6
7 #include "alloc-util.h"
8 #include "errno-util.h"
9 #include "escape.h"
10 #include "fd-util.h"
11 #include "io-util.h"
12 #include "journal-file.h"
13 #include "journal-importer.h"
14 #include "journal-util.h"
15 #include "parse-util.h"
16 #include "string-util.h"
17 #include "unaligned.h"
18
19 enum {
20 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
21 IMPORTER_STATE_DATA_START, /* reading binary data header */
22 IMPORTER_STATE_DATA, /* reading binary data */
23 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
24 IMPORTER_STATE_EOF, /* done */
25 };
26
journal_importer_cleanup(JournalImporter * imp)27 void journal_importer_cleanup(JournalImporter *imp) {
28 if (imp->fd >= 0 && !imp->passive_fd) {
29 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
30 safe_close(imp->fd);
31 }
32
33 free(imp->name);
34 free(imp->buf);
35 iovw_free_contents(&imp->iovw, false);
36 }
37
realloc_buffer(JournalImporter * imp,size_t size)38 static char* realloc_buffer(JournalImporter *imp, size_t size) {
39 char *b, *old = imp->buf;
40
41 b = GREEDY_REALLOC(imp->buf, size);
42 if (!b)
43 return NULL;
44
45 iovw_rebase(&imp->iovw, old, imp->buf);
46
47 return b;
48 }
49
get_line(JournalImporter * imp,char ** line,size_t * size)50 static int get_line(JournalImporter *imp, char **line, size_t *size) {
51 ssize_t n;
52 char *c = NULL;
53
54 assert(imp);
55 assert(imp->state == IMPORTER_STATE_LINE);
56 assert(imp->offset <= imp->filled);
57 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
58 assert(imp->fd >= 0);
59
60 for (;;) {
61 if (imp->buf) {
62 size_t start = MAX(imp->scanned, imp->offset);
63
64 c = memchr(imp->buf + start, '\n',
65 imp->filled - start);
66 if (c)
67 break;
68 }
69
70 imp->scanned = imp->filled;
71 if (imp->scanned >= DATA_SIZE_MAX)
72 return log_warning_errno(SYNTHETIC_ERRNO(ENOBUFS),
73 "Entry is bigger than %u bytes.",
74 DATA_SIZE_MAX);
75
76 if (imp->passive_fd)
77 /* we have to wait for some data to come to us */
78 return -EAGAIN;
79
80 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
81 we reallocate it, we'll increase the size at least a bit. */
82 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
83 if (MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled < LINE_CHUNK &&
84 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
85 return log_oom();
86
87 assert(imp->buf);
88 assert(MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled >= LINE_CHUNK ||
89 MALLOC_SIZEOF_SAFE(imp->buf) >= ENTRY_SIZE_MAX);
90
91 n = read(imp->fd,
92 imp->buf + imp->filled,
93 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
94 if (n < 0) {
95 if (errno != EAGAIN)
96 log_error_errno(errno, "read(%d, ..., %zu): %m",
97 imp->fd,
98 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
99 return -errno;
100 } else if (n == 0)
101 return 0;
102
103 imp->filled += n;
104 }
105
106 *line = imp->buf + imp->offset;
107 *size = c + 1 - imp->buf - imp->offset;
108 imp->offset += *size;
109
110 return 1;
111 }
112
fill_fixed_size(JournalImporter * imp,void ** data,size_t size)113 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
114
115 assert(imp);
116 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
117 assert(size <= DATA_SIZE_MAX);
118 assert(imp->offset <= imp->filled);
119 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
120 assert(imp->fd >= 0);
121 assert(data);
122
123 while (imp->filled - imp->offset < size) {
124 int n;
125
126 if (imp->passive_fd)
127 /* we have to wait for some data to come to us */
128 return -EAGAIN;
129
130 if (!realloc_buffer(imp, imp->offset + size))
131 return log_oom();
132
133 n = read(imp->fd, imp->buf + imp->filled,
134 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
135 if (n < 0) {
136 if (errno != EAGAIN)
137 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
138 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
139 return -errno;
140 } else if (n == 0)
141 return 0;
142
143 imp->filled += n;
144 }
145
146 *data = imp->buf + imp->offset;
147 imp->offset += size;
148
149 return 1;
150 }
151
get_data_size(JournalImporter * imp)152 static int get_data_size(JournalImporter *imp) {
153 int r;
154 void *data;
155
156 assert(imp);
157 assert(imp->state == IMPORTER_STATE_DATA_START);
158 assert(imp->data_size == 0);
159
160 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
161 if (r <= 0)
162 return r;
163
164 imp->data_size = unaligned_read_le64(data);
165 if (imp->data_size > DATA_SIZE_MAX)
166 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
167 "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
168 imp->data_size, DATA_SIZE_MAX);
169 if (imp->data_size == 0)
170 log_warning("Binary field with zero length");
171
172 return 1;
173 }
174
get_data_data(JournalImporter * imp,void ** data)175 static int get_data_data(JournalImporter *imp, void **data) {
176 int r;
177
178 assert(imp);
179 assert(data);
180 assert(imp->state == IMPORTER_STATE_DATA);
181
182 r = fill_fixed_size(imp, data, imp->data_size);
183 if (r <= 0)
184 return r;
185
186 return 1;
187 }
188
get_data_newline(JournalImporter * imp)189 static int get_data_newline(JournalImporter *imp) {
190 int r;
191 char *data;
192
193 assert(imp);
194 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
195
196 r = fill_fixed_size(imp, (void**) &data, 1);
197 if (r <= 0)
198 return r;
199
200 assert(data);
201 if (*data != '\n') {
202 char buf[4];
203 int l;
204
205 l = cescape_char(*data, buf);
206 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
207 "Expected newline, got '%.*s'", l, buf);
208 }
209
210 return 1;
211 }
212
process_special_field(JournalImporter * imp,char * line)213 static int process_special_field(JournalImporter *imp, char *line) {
214 const char *value;
215 char buf[CELLESCAPE_DEFAULT_LENGTH];
216 int r;
217
218 assert(line);
219
220 value = startswith(line, "__CURSOR=");
221 if (value)
222 /* ignore __CURSOR */
223 return 1;
224
225 value = startswith(line, "__REALTIME_TIMESTAMP=");
226 if (value) {
227 uint64_t x;
228
229 r = safe_atou64(value, &x);
230 if (r < 0)
231 return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
232 cellescape(buf, sizeof buf, value));
233 else if (!VALID_REALTIME(x)) {
234 log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
235 return -ERANGE;
236 }
237
238 imp->ts.realtime = x;
239 return 1;
240 }
241
242 value = startswith(line, "__MONOTONIC_TIMESTAMP=");
243 if (value) {
244 uint64_t x;
245
246 r = safe_atou64(value, &x);
247 if (r < 0)
248 return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
249 cellescape(buf, sizeof buf, value));
250 else if (!VALID_MONOTONIC(x)) {
251 log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
252 return -ERANGE;
253 }
254
255 imp->ts.monotonic = x;
256 return 1;
257 }
258
259 /* Just a single underline, but it needs special treatment too. */
260 value = startswith(line, "_BOOT_ID=");
261 if (value) {
262 r = sd_id128_from_string(value, &imp->boot_id);
263 if (r < 0)
264 return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
265 cellescape(buf, sizeof buf, value));
266
267 /* store the field in the usual fashion too */
268 return 0;
269 }
270
271 value = startswith(line, "__");
272 if (value) {
273 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
274 return 1;
275 }
276
277 /* no dunder */
278 return 0;
279 }
280
journal_importer_process_data(JournalImporter * imp)281 int journal_importer_process_data(JournalImporter *imp) {
282 int r;
283
284 switch (imp->state) {
285 case IMPORTER_STATE_LINE: {
286 char *line, *sep;
287 size_t n = 0;
288
289 assert(imp->data_size == 0);
290
291 r = get_line(imp, &line, &n);
292 if (r < 0)
293 return r;
294 if (r == 0) {
295 imp->state = IMPORTER_STATE_EOF;
296 return 0;
297 }
298 assert(n > 0);
299 assert(line[n-1] == '\n');
300
301 if (n == 1) {
302 log_trace("Received empty line, event is ready");
303 return 1;
304 }
305
306 /* MESSAGE=xxx\n
307 or
308 COREDUMP\n
309 LLLLLLLL0011223344...\n
310 */
311 sep = memchr(line, '=', n);
312 if (sep) {
313 /* chomp newline */
314 n--;
315
316 if (!journal_field_valid(line, sep - line, true)) {
317 char buf[64], *t;
318
319 t = strndupa_safe(line, sep - line);
320 log_debug("Ignoring invalid field: \"%s\"",
321 cellescape(buf, sizeof buf, t));
322
323 return 0;
324 }
325
326 line[n] = '\0';
327 r = process_special_field(imp, line);
328 if (r != 0)
329 return r < 0 ? r : 0;
330
331 r = iovw_put(&imp->iovw, line, n);
332 if (r < 0)
333 return r;
334 } else {
335 if (!journal_field_valid(line, n - 1, true)) {
336 char buf[64], *t;
337
338 t = strndupa_safe(line, n - 1);
339 log_debug("Ignoring invalid field: \"%s\"",
340 cellescape(buf, sizeof buf, t));
341
342 return 0;
343 }
344
345 /* replace \n with = */
346 line[n-1] = '=';
347
348 imp->field_len = n;
349 imp->state = IMPORTER_STATE_DATA_START;
350
351 /* we cannot put the field in iovec until we have all data */
352 }
353
354 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
355
356 return 0; /* continue */
357 }
358
359 case IMPORTER_STATE_DATA_START:
360 assert(imp->data_size == 0);
361
362 r = get_data_size(imp);
363 // log_debug("get_data_size() -> %d", r);
364 if (r < 0)
365 return r;
366 if (r == 0) {
367 imp->state = IMPORTER_STATE_EOF;
368 return 0;
369 }
370
371 imp->state = imp->data_size > 0 ?
372 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
373
374 return 0; /* continue */
375
376 case IMPORTER_STATE_DATA: {
377 void *data;
378 char *field;
379
380 assert(imp->data_size > 0);
381
382 r = get_data_data(imp, &data);
383 // log_debug("get_data_data() -> %d", r);
384 if (r < 0)
385 return r;
386 if (r == 0) {
387 imp->state = IMPORTER_STATE_EOF;
388 return 0;
389 }
390
391 assert(data);
392
393 field = (char*) data - sizeof(uint64_t) - imp->field_len;
394 memmove(field + sizeof(uint64_t), field, imp->field_len);
395
396 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
397 if (r < 0)
398 return r;
399
400 imp->state = IMPORTER_STATE_DATA_FINISH;
401
402 return 0; /* continue */
403 }
404
405 case IMPORTER_STATE_DATA_FINISH:
406 r = get_data_newline(imp);
407 // log_debug("get_data_newline() -> %d", r);
408 if (r < 0)
409 return r;
410 if (r == 0) {
411 imp->state = IMPORTER_STATE_EOF;
412 return 0;
413 }
414
415 imp->data_size = 0;
416 imp->state = IMPORTER_STATE_LINE;
417
418 return 0; /* continue */
419 default:
420 assert_not_reached();
421 }
422 }
423
journal_importer_push_data(JournalImporter * imp,const char * data,size_t size)424 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
425 assert(imp);
426 assert(imp->state != IMPORTER_STATE_EOF);
427
428 if (!realloc_buffer(imp, imp->filled + size))
429 return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
430 "Failed to store received data of size %zu "
431 "(in addition to existing %zu bytes with %zu filled): %s",
432 size, MALLOC_SIZEOF_SAFE(imp->buf), imp->filled,
433 strerror_safe(ENOMEM));
434
435 memcpy(imp->buf + imp->filled, data, size);
436 imp->filled += size;
437
438 return 0;
439 }
440
journal_importer_drop_iovw(JournalImporter * imp)441 void journal_importer_drop_iovw(JournalImporter *imp) {
442 size_t remain, target;
443
444 /* This function drops processed data that along with the iovw that points at it */
445
446 iovw_free_contents(&imp->iovw, false);
447
448 /* possibly reset buffer position */
449 remain = imp->filled - imp->offset;
450
451 if (remain == 0) /* no brainer */
452 imp->offset = imp->scanned = imp->filled = 0;
453 else if (imp->offset > MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled &&
454 imp->offset > remain) {
455 memcpy(imp->buf, imp->buf + imp->offset, remain);
456 imp->offset = imp->scanned = 0;
457 imp->filled = remain;
458 }
459
460 target = MALLOC_SIZEOF_SAFE(imp->buf);
461 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
462 target /= 2;
463 if (target < MALLOC_SIZEOF_SAFE(imp->buf)) {
464 char *tmp;
465 size_t old_size;
466
467 old_size = MALLOC_SIZEOF_SAFE(imp->buf);
468
469 tmp = realloc(imp->buf, target);
470 if (!tmp)
471 log_warning("Failed to reallocate buffer to (smaller) size %zu",
472 target);
473 else {
474 log_debug("Reallocated buffer from %zu to %zu bytes",
475 old_size, target);
476 imp->buf = tmp;
477 }
478 }
479 }
480
journal_importer_eof(const JournalImporter * imp)481 bool journal_importer_eof(const JournalImporter *imp) {
482 return imp->state == IMPORTER_STATE_EOF;
483 }
484