Lines Matching refs:imp

27 void journal_importer_cleanup(JournalImporter *imp) {  in journal_importer_cleanup()  argument
28 if (imp->fd >= 0 && !imp->passive_fd) { in journal_importer_cleanup()
29 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd); in journal_importer_cleanup()
30 safe_close(imp->fd); in journal_importer_cleanup()
33 free(imp->name); in journal_importer_cleanup()
34 free(imp->buf); in journal_importer_cleanup()
35 iovw_free_contents(&imp->iovw, false); in journal_importer_cleanup()
38 static char* realloc_buffer(JournalImporter *imp, size_t size) { in realloc_buffer() argument
39 char *b, *old = imp->buf; in realloc_buffer()
41 b = GREEDY_REALLOC(imp->buf, size); in realloc_buffer()
45 iovw_rebase(&imp->iovw, old, imp->buf); in realloc_buffer()
50 static int get_line(JournalImporter *imp, char **line, size_t *size) { in get_line() argument
54 assert(imp); in get_line()
55 assert(imp->state == IMPORTER_STATE_LINE); in get_line()
56 assert(imp->offset <= imp->filled); in get_line()
57 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf)); in get_line()
58 assert(imp->fd >= 0); in get_line()
61 if (imp->buf) { in get_line()
62 size_t start = MAX(imp->scanned, imp->offset); in get_line()
64 c = memchr(imp->buf + start, '\n', in get_line()
65 imp->filled - start); in get_line()
70 imp->scanned = imp->filled; in get_line()
71 if (imp->scanned >= DATA_SIZE_MAX) in get_line()
76 if (imp->passive_fd) in get_line()
83 if (MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled < LINE_CHUNK && in get_line()
84 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX))) in get_line()
87 assert(imp->buf); in get_line()
88 assert(MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled >= LINE_CHUNK || in get_line()
89 MALLOC_SIZEOF_SAFE(imp->buf) >= ENTRY_SIZE_MAX); in get_line()
91 n = read(imp->fd, in get_line()
92 imp->buf + imp->filled, in get_line()
93 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled); in get_line()
97 imp->fd, in get_line()
98 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled); in get_line()
103 imp->filled += n; in get_line()
106 *line = imp->buf + imp->offset; in get_line()
107 *size = c + 1 - imp->buf - imp->offset; in get_line()
108 imp->offset += *size; in get_line()
113 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) { in fill_fixed_size() argument
115 assert(imp); in fill_fixed_size()
116 …assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINI… in fill_fixed_size()
118 assert(imp->offset <= imp->filled); in fill_fixed_size()
119 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf)); in fill_fixed_size()
120 assert(imp->fd >= 0); in fill_fixed_size()
123 while (imp->filled - imp->offset < size) { in fill_fixed_size()
126 if (imp->passive_fd) in fill_fixed_size()
130 if (!realloc_buffer(imp, imp->offset + size)) in fill_fixed_size()
133 n = read(imp->fd, imp->buf + imp->filled, in fill_fixed_size()
134 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled); in fill_fixed_size()
137 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd, in fill_fixed_size()
138 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled); in fill_fixed_size()
143 imp->filled += n; in fill_fixed_size()
146 *data = imp->buf + imp->offset; in fill_fixed_size()
147 imp->offset += size; in fill_fixed_size()
152 static int get_data_size(JournalImporter *imp) { in get_data_size() argument
156 assert(imp); in get_data_size()
157 assert(imp->state == IMPORTER_STATE_DATA_START); in get_data_size()
158 assert(imp->data_size == 0); in get_data_size()
160 r = fill_fixed_size(imp, &data, sizeof(uint64_t)); in get_data_size()
164 imp->data_size = unaligned_read_le64(data); in get_data_size()
165 if (imp->data_size > DATA_SIZE_MAX) in get_data_size()
168 imp->data_size, DATA_SIZE_MAX); in get_data_size()
169 if (imp->data_size == 0) in get_data_size()
175 static int get_data_data(JournalImporter *imp, void **data) { in get_data_data() argument
178 assert(imp); in get_data_data()
180 assert(imp->state == IMPORTER_STATE_DATA); in get_data_data()
182 r = fill_fixed_size(imp, data, imp->data_size); in get_data_data()
189 static int get_data_newline(JournalImporter *imp) { in get_data_newline() argument
193 assert(imp); in get_data_newline()
194 assert(imp->state == IMPORTER_STATE_DATA_FINISH); in get_data_newline()
196 r = fill_fixed_size(imp, (void**) &data, 1); in get_data_newline()
213 static int process_special_field(JournalImporter *imp, char *line) { in process_special_field() argument
238 imp->ts.realtime = x; in process_special_field()
255 imp->ts.monotonic = x; in process_special_field()
262 r = sd_id128_from_string(value, &imp->boot_id); in process_special_field()
281 int journal_importer_process_data(JournalImporter *imp) { in journal_importer_process_data() argument
284 switch (imp->state) { in journal_importer_process_data()
289 assert(imp->data_size == 0); in journal_importer_process_data()
291 r = get_line(imp, &line, &n); in journal_importer_process_data()
295 imp->state = IMPORTER_STATE_EOF; in journal_importer_process_data()
327 r = process_special_field(imp, line); in journal_importer_process_data()
331 r = iovw_put(&imp->iovw, line, n); in journal_importer_process_data()
348 imp->field_len = n; in journal_importer_process_data()
349 imp->state = IMPORTER_STATE_DATA_START; in journal_importer_process_data()
360 assert(imp->data_size == 0); in journal_importer_process_data()
362 r = get_data_size(imp); in journal_importer_process_data()
367 imp->state = IMPORTER_STATE_EOF; in journal_importer_process_data()
371 imp->state = imp->data_size > 0 ? in journal_importer_process_data()
380 assert(imp->data_size > 0); in journal_importer_process_data()
382 r = get_data_data(imp, &data); in journal_importer_process_data()
387 imp->state = IMPORTER_STATE_EOF; in journal_importer_process_data()
393 field = (char*) data - sizeof(uint64_t) - imp->field_len; in journal_importer_process_data()
394 memmove(field + sizeof(uint64_t), field, imp->field_len); in journal_importer_process_data()
396 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size); in journal_importer_process_data()
400 imp->state = IMPORTER_STATE_DATA_FINISH; in journal_importer_process_data()
406 r = get_data_newline(imp); in journal_importer_process_data()
411 imp->state = IMPORTER_STATE_EOF; in journal_importer_process_data()
415 imp->data_size = 0; in journal_importer_process_data()
416 imp->state = IMPORTER_STATE_LINE; in journal_importer_process_data()
424 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) { in journal_importer_push_data() argument
425 assert(imp); in journal_importer_push_data()
426 assert(imp->state != IMPORTER_STATE_EOF); in journal_importer_push_data()
428 if (!realloc_buffer(imp, imp->filled + size)) in journal_importer_push_data()
432 size, MALLOC_SIZEOF_SAFE(imp->buf), imp->filled, in journal_importer_push_data()
435 memcpy(imp->buf + imp->filled, data, size); in journal_importer_push_data()
436 imp->filled += size; in journal_importer_push_data()
441 void journal_importer_drop_iovw(JournalImporter *imp) { in journal_importer_drop_iovw() argument
446 iovw_free_contents(&imp->iovw, false); in journal_importer_drop_iovw()
449 remain = imp->filled - imp->offset; in journal_importer_drop_iovw()
452 imp->offset = imp->scanned = imp->filled = 0; in journal_importer_drop_iovw()
453 else if (imp->offset > MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled && in journal_importer_drop_iovw()
454 imp->offset > remain) { in journal_importer_drop_iovw()
455 memcpy(imp->buf, imp->buf + imp->offset, remain); in journal_importer_drop_iovw()
456 imp->offset = imp->scanned = 0; in journal_importer_drop_iovw()
457 imp->filled = remain; in journal_importer_drop_iovw()
460 target = MALLOC_SIZEOF_SAFE(imp->buf); in journal_importer_drop_iovw()
461 while (target > 16 * LINE_CHUNK && imp->filled < target / 2) in journal_importer_drop_iovw()
463 if (target < MALLOC_SIZEOF_SAFE(imp->buf)) { in journal_importer_drop_iovw()
467 old_size = MALLOC_SIZEOF_SAFE(imp->buf); in journal_importer_drop_iovw()
469 tmp = realloc(imp->buf, target); in journal_importer_drop_iovw()
476 imp->buf = tmp; in journal_importer_drop_iovw()
481 bool journal_importer_eof(const JournalImporter *imp) { in journal_importer_eof() argument
482 return imp->state == IMPORTER_STATE_EOF; in journal_importer_eof()