1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <curl/curl.h>
4 #include <stdbool.h>
5
6 #include "sd-daemon.h"
7
8 #include "alloc-util.h"
9 #include "journal-upload.h"
10 #include "log.h"
11 #include "string-util.h"
12 #include "utf8.h"
13 #include "util.h"
14
15 /**
16 * Write up to size bytes to buf. Return negative on error, and number of
17 * bytes written otherwise. The last case is a kind of an error too.
18 */
write_entry(char * buf,size_t size,Uploader * u)19 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
20 int r;
21 size_t pos = 0;
22
23 assert(size <= SSIZE_MAX);
24
25 for (;;) {
26
27 switch (u->entry_state) {
28 case ENTRY_CURSOR: {
29 u->current_cursor = mfree(u->current_cursor);
30
31 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
32 if (r < 0)
33 return log_error_errno(r, "Failed to get cursor: %m");
34
35 r = snprintf(buf + pos, size - pos,
36 "__CURSOR=%s\n", u->current_cursor);
37 assert(r >= 0);
38 if ((size_t) r > size - pos)
39 /* not enough space */
40 return pos;
41
42 u->entry_state++;
43
44 if (pos + r == size) {
45 /* exactly one character short, but we don't need it */
46 buf[size - 1] = '\n';
47 return size;
48 }
49
50 pos += r;
51 }
52 _fallthrough_;
53 case ENTRY_REALTIME: {
54 usec_t realtime;
55
56 r = sd_journal_get_realtime_usec(u->journal, &realtime);
57 if (r < 0)
58 return log_error_errno(r, "Failed to get realtime timestamp: %m");
59
60 r = snprintf(buf + pos, size - pos,
61 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
62 assert(r >= 0);
63 if ((size_t) r > size - pos)
64 /* not enough space */
65 return pos;
66
67 u->entry_state++;
68
69 if (r + pos == size) {
70 /* exactly one character short, but we don't need it */
71 buf[size - 1] = '\n';
72 return size;
73 }
74
75 pos += r;
76 }
77 _fallthrough_;
78 case ENTRY_MONOTONIC: {
79 usec_t monotonic;
80 sd_id128_t boot_id;
81
82 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
83 if (r < 0)
84 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
85
86 r = snprintf(buf + pos, size - pos,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88 assert(r >= 0);
89 if ((size_t) r > size - pos)
90 /* not enough space */
91 return pos;
92
93 u->entry_state++;
94
95 if (r + pos == size) {
96 /* exactly one character short, but we don't need it */
97 buf[size - 1] = '\n';
98 return size;
99 }
100
101 pos += r;
102 }
103 _fallthrough_;
104 case ENTRY_BOOT_ID: {
105 sd_id128_t boot_id;
106
107 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
108 if (r < 0)
109 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
110
111 r = snprintf(buf + pos, size - pos,
112 "_BOOT_ID=%s\n", SD_ID128_TO_STRING(boot_id));
113 assert(r >= 0);
114 if ((size_t) r > size - pos)
115 /* not enough space */
116 return pos;
117
118 u->entry_state++;
119
120 if (r + pos == size) {
121 /* exactly one character short, but we don't need it */
122 buf[size - 1] = '\n';
123 return size;
124 }
125
126 pos += r;
127 }
128 _fallthrough_;
129 case ENTRY_NEW_FIELD: {
130 u->field_pos = 0;
131
132 r = sd_journal_enumerate_data(u->journal,
133 &u->field_data,
134 &u->field_length);
135 if (r < 0)
136 return log_error_errno(r, "Failed to move to next field in entry: %m");
137 else if (r == 0) {
138 u->entry_state = ENTRY_OUTRO;
139 continue;
140 }
141
142 /* We already printed the boot id from the data in
143 * the header, hence let's suppress it here */
144 if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
145 continue;
146
147 if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
148 u->entry_state = ENTRY_BINARY_FIELD_START;
149 continue;
150 }
151
152 u->entry_state++;
153 }
154 _fallthrough_;
155 case ENTRY_TEXT_FIELD:
156 case ENTRY_BINARY_FIELD: {
157 bool done;
158 size_t tocopy;
159
160 done = size - pos > u->field_length - u->field_pos;
161 if (done)
162 tocopy = u->field_length - u->field_pos;
163 else
164 tocopy = size - pos;
165
166 memcpy(buf + pos,
167 (char*) u->field_data + u->field_pos,
168 tocopy);
169
170 if (done) {
171 buf[pos + tocopy] = '\n';
172 pos += tocopy + 1;
173 u->entry_state = ENTRY_NEW_FIELD;
174 continue;
175 } else {
176 u->field_pos += tocopy;
177 return size;
178 }
179 }
180
181 case ENTRY_BINARY_FIELD_START: {
182 const char *c;
183 size_t len;
184
185 c = memchr(u->field_data, '=', u->field_length);
186 if (!c || c == u->field_data)
187 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
188 "Invalid field.");
189
190 len = c - (const char*)u->field_data;
191
192 /* need space for label + '\n' */
193 if (size - pos < len + 1)
194 return pos;
195
196 memcpy(buf + pos, u->field_data, len);
197 buf[pos + len] = '\n';
198 pos += len + 1;
199
200 u->field_pos = len + 1;
201 u->entry_state++;
202 }
203 _fallthrough_;
204 case ENTRY_BINARY_FIELD_SIZE: {
205 uint64_t le64;
206
207 /* need space for uint64_t */
208 if (size - pos < 8)
209 return pos;
210
211 le64 = htole64(u->field_length - u->field_pos);
212 memcpy(buf + pos, &le64, 8);
213 pos += 8;
214
215 u->entry_state++;
216 continue;
217 }
218
219 case ENTRY_OUTRO:
220 /* need space for '\n' */
221 if (size - pos < 1)
222 return pos;
223
224 buf[pos++] = '\n';
225 u->entry_state++;
226 u->entries_sent++;
227
228 return pos;
229
230 default:
231 assert_not_reached();
232 }
233 }
234 assert_not_reached();
235 }
236
check_update_watchdog(Uploader * u)237 static void check_update_watchdog(Uploader *u) {
238 usec_t after;
239 usec_t elapsed_time;
240
241 if (u->watchdog_usec <= 0)
242 return;
243
244 after = now(CLOCK_MONOTONIC);
245 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
246 if (elapsed_time > u->watchdog_usec / 2) {
247 log_debug("Update watchdog timer");
248 sd_notify(false, "WATCHDOG=1");
249 u->watchdog_timestamp = after;
250 }
251 }
252
journal_input_callback(void * buf,size_t size,size_t nmemb,void * userp)253 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
254 Uploader *u = userp;
255 int r;
256 sd_journal *j;
257 size_t filled = 0;
258 ssize_t w;
259
260 assert(u);
261 assert(nmemb <= SSIZE_MAX / size);
262
263 check_update_watchdog(u);
264
265 j = u->journal;
266
267 while (j && filled < size * nmemb) {
268 if (u->entry_state == ENTRY_DONE) {
269 r = sd_journal_next(j);
270 if (r < 0) {
271 log_error_errno(r, "Failed to move to next entry in journal: %m");
272 return CURL_READFUNC_ABORT;
273 } else if (r == 0) {
274 if (u->input_event)
275 log_debug("No more entries, waiting for journal.");
276 else {
277 log_info("No more entries, closing journal.");
278 close_journal_input(u);
279 }
280
281 u->uploading = false;
282
283 break;
284 }
285
286 u->entry_state = ENTRY_CURSOR;
287 }
288
289 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
290 if (w < 0)
291 return CURL_READFUNC_ABORT;
292 filled += w;
293
294 if (filled == 0) {
295 log_error("Buffer space is too small to write entry.");
296 return CURL_READFUNC_ABORT;
297 } else if (u->entry_state != ENTRY_DONE)
298 /* This means that all available space was used up */
299 break;
300
301 log_debug("Entry %zu (%s) has been uploaded.",
302 u->entries_sent, u->current_cursor);
303 }
304
305 return filled;
306 }
307
close_journal_input(Uploader * u)308 void close_journal_input(Uploader *u) {
309 assert(u);
310
311 if (u->journal) {
312 log_debug("Closing journal input.");
313
314 sd_journal_close(u->journal);
315 u->journal = NULL;
316 }
317 u->timeout = 0;
318 }
319
process_journal_input(Uploader * u,int skip)320 static int process_journal_input(Uploader *u, int skip) {
321 int r;
322
323 if (u->uploading)
324 return 0;
325
326 r = sd_journal_next_skip(u->journal, skip);
327 if (r < 0)
328 return log_error_errno(r, "Failed to skip to next entry: %m");
329 else if (r < skip)
330 return 0;
331
332 /* have data */
333 u->entry_state = ENTRY_CURSOR;
334 return start_upload(u, journal_input_callback, u);
335 }
336
check_journal_input(Uploader * u)337 int check_journal_input(Uploader *u) {
338 if (u->input_event) {
339 int r;
340
341 r = sd_journal_process(u->journal);
342 if (r < 0) {
343 log_error_errno(r, "Failed to process journal: %m");
344 close_journal_input(u);
345 return r;
346 }
347
348 if (r == SD_JOURNAL_NOP)
349 return 0;
350 }
351
352 return process_journal_input(u, 1);
353 }
354
dispatch_journal_input(sd_event_source * event,int fd,uint32_t revents,void * userp)355 static int dispatch_journal_input(sd_event_source *event,
356 int fd,
357 uint32_t revents,
358 void *userp) {
359 Uploader *u = userp;
360
361 assert(u);
362
363 if (u->uploading)
364 return 0;
365
366 log_debug("Detected journal input, checking for new data.");
367 return check_journal_input(u);
368 }
369
open_journal_for_upload(Uploader * u,sd_journal * j,const char * cursor,bool after_cursor,bool follow)370 int open_journal_for_upload(Uploader *u,
371 sd_journal *j,
372 const char *cursor,
373 bool after_cursor,
374 bool follow) {
375 int fd, r, events;
376
377 u->journal = j;
378
379 sd_journal_set_data_threshold(j, 0);
380
381 if (follow) {
382 fd = sd_journal_get_fd(j);
383 if (fd < 0)
384 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
385
386 events = sd_journal_get_events(j);
387
388 r = sd_journal_reliable_fd(j);
389 assert(r >= 0);
390 if (r > 0)
391 u->timeout = -1;
392 else
393 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
394
395 r = sd_event_add_io(u->events, &u->input_event,
396 fd, events, dispatch_journal_input, u);
397 if (r < 0)
398 return log_error_errno(r, "Failed to register input event: %m");
399
400 log_debug("Listening for journal events on fd:%d, timeout %d",
401 fd, u->timeout == UINT64_MAX ? -1 : (int) u->timeout);
402 } else
403 log_debug("Not listening for journal events.");
404
405 if (cursor) {
406 r = sd_journal_seek_cursor(j, cursor);
407 if (r < 0)
408 return log_error_errno(r, "Failed to seek to cursor %s: %m",
409 cursor);
410 }
411
412 return process_journal_input(u, !!after_cursor);
413 }
414