1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2 
3 #include <curl/curl.h>
4 #include <stdbool.h>
5 
6 #include "sd-daemon.h"
7 
8 #include "alloc-util.h"
9 #include "journal-upload.h"
10 #include "log.h"
11 #include "string-util.h"
12 #include "utf8.h"
13 #include "util.h"
14 
15 /**
16  * Write up to size bytes to buf. Return negative on error, and number of
17  * bytes written otherwise. The last case is a kind of an error too.
18  */
write_entry(char * buf,size_t size,Uploader * u)19 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
20         int r;
21         size_t pos = 0;
22 
23         assert(size <= SSIZE_MAX);
24 
25         for (;;) {
26 
27                 switch (u->entry_state) {
28                 case ENTRY_CURSOR: {
29                         u->current_cursor = mfree(u->current_cursor);
30 
31                         r = sd_journal_get_cursor(u->journal, &u->current_cursor);
32                         if (r < 0)
33                                 return log_error_errno(r, "Failed to get cursor: %m");
34 
35                         r = snprintf(buf + pos, size - pos,
36                                      "__CURSOR=%s\n", u->current_cursor);
37                         assert(r >= 0);
38                         if ((size_t) r > size - pos)
39                                 /* not enough space */
40                                 return pos;
41 
42                         u->entry_state++;
43 
44                         if (pos + r == size) {
45                                 /* exactly one character short, but we don't need it */
46                                 buf[size - 1] = '\n';
47                                 return size;
48                         }
49 
50                         pos += r;
51                 }
52                         _fallthrough_;
53                 case ENTRY_REALTIME: {
54                         usec_t realtime;
55 
56                         r = sd_journal_get_realtime_usec(u->journal, &realtime);
57                         if (r < 0)
58                                 return log_error_errno(r, "Failed to get realtime timestamp: %m");
59 
60                         r = snprintf(buf + pos, size - pos,
61                                      "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
62                         assert(r >= 0);
63                         if ((size_t) r > size - pos)
64                                 /* not enough space */
65                                 return pos;
66 
67                         u->entry_state++;
68 
69                         if (r + pos == size) {
70                                 /* exactly one character short, but we don't need it */
71                                 buf[size - 1] = '\n';
72                                 return size;
73                         }
74 
75                         pos += r;
76                 }
77                         _fallthrough_;
78                 case ENTRY_MONOTONIC: {
79                         usec_t monotonic;
80                         sd_id128_t boot_id;
81 
82                         r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
83                         if (r < 0)
84                                 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
85 
86                         r = snprintf(buf + pos, size - pos,
87                                      "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88                         assert(r >= 0);
89                         if ((size_t) r > size - pos)
90                                 /* not enough space */
91                                 return pos;
92 
93                         u->entry_state++;
94 
95                         if (r + pos == size) {
96                                 /* exactly one character short, but we don't need it */
97                                 buf[size - 1] = '\n';
98                                 return size;
99                         }
100 
101                         pos += r;
102                 }
103                         _fallthrough_;
104                 case ENTRY_BOOT_ID: {
105                         sd_id128_t boot_id;
106 
107                         r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
108                         if (r < 0)
109                                 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
110 
111                         r = snprintf(buf + pos, size - pos,
112                                      "_BOOT_ID=%s\n", SD_ID128_TO_STRING(boot_id));
113                         assert(r >= 0);
114                         if ((size_t) r > size - pos)
115                                 /* not enough space */
116                                 return pos;
117 
118                         u->entry_state++;
119 
120                         if (r + pos == size) {
121                                 /* exactly one character short, but we don't need it */
122                                 buf[size - 1] = '\n';
123                                 return size;
124                         }
125 
126                         pos += r;
127                 }
128                         _fallthrough_;
129                 case ENTRY_NEW_FIELD: {
130                         u->field_pos = 0;
131 
132                         r = sd_journal_enumerate_data(u->journal,
133                                                       &u->field_data,
134                                                       &u->field_length);
135                         if (r < 0)
136                                 return log_error_errno(r, "Failed to move to next field in entry: %m");
137                         else if (r == 0) {
138                                 u->entry_state = ENTRY_OUTRO;
139                                 continue;
140                         }
141 
142                         /* We already printed the boot id from the data in
143                          * the header, hence let's suppress it here */
144                         if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
145                                 continue;
146 
147                         if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
148                                 u->entry_state = ENTRY_BINARY_FIELD_START;
149                                 continue;
150                         }
151 
152                         u->entry_state++;
153                 }
154                         _fallthrough_;
155                 case ENTRY_TEXT_FIELD:
156                 case ENTRY_BINARY_FIELD: {
157                         bool done;
158                         size_t tocopy;
159 
160                         done = size - pos > u->field_length - u->field_pos;
161                         if (done)
162                                 tocopy = u->field_length - u->field_pos;
163                         else
164                                 tocopy = size - pos;
165 
166                         memcpy(buf + pos,
167                                (char*) u->field_data + u->field_pos,
168                                tocopy);
169 
170                         if (done) {
171                                 buf[pos + tocopy] = '\n';
172                                 pos += tocopy + 1;
173                                 u->entry_state = ENTRY_NEW_FIELD;
174                                 continue;
175                         } else {
176                                 u->field_pos += tocopy;
177                                 return size;
178                         }
179                 }
180 
181                 case ENTRY_BINARY_FIELD_START: {
182                         const char *c;
183                         size_t len;
184 
185                         c = memchr(u->field_data, '=', u->field_length);
186                         if (!c || c == u->field_data)
187                                 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
188                                                        "Invalid field.");
189 
190                         len = c - (const char*)u->field_data;
191 
192                         /* need space for label + '\n' */
193                         if (size - pos < len + 1)
194                                 return pos;
195 
196                         memcpy(buf + pos, u->field_data, len);
197                         buf[pos + len] = '\n';
198                         pos += len + 1;
199 
200                         u->field_pos = len + 1;
201                         u->entry_state++;
202                 }
203                         _fallthrough_;
204                 case ENTRY_BINARY_FIELD_SIZE: {
205                         uint64_t le64;
206 
207                         /* need space for uint64_t */
208                         if (size - pos < 8)
209                                 return pos;
210 
211                         le64 = htole64(u->field_length - u->field_pos);
212                         memcpy(buf + pos, &le64, 8);
213                         pos += 8;
214 
215                         u->entry_state++;
216                         continue;
217                 }
218 
219                 case ENTRY_OUTRO:
220                         /* need space for '\n' */
221                         if (size - pos < 1)
222                                 return pos;
223 
224                         buf[pos++] = '\n';
225                         u->entry_state++;
226                         u->entries_sent++;
227 
228                         return pos;
229 
230                 default:
231                         assert_not_reached();
232                 }
233         }
234         assert_not_reached();
235 }
236 
check_update_watchdog(Uploader * u)237 static void check_update_watchdog(Uploader *u) {
238         usec_t after;
239         usec_t elapsed_time;
240 
241         if (u->watchdog_usec <= 0)
242                 return;
243 
244         after = now(CLOCK_MONOTONIC);
245         elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
246         if (elapsed_time > u->watchdog_usec / 2) {
247                 log_debug("Update watchdog timer");
248                 sd_notify(false, "WATCHDOG=1");
249                 u->watchdog_timestamp = after;
250         }
251 }
252 
journal_input_callback(void * buf,size_t size,size_t nmemb,void * userp)253 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
254         Uploader *u = userp;
255         int r;
256         sd_journal *j;
257         size_t filled = 0;
258         ssize_t w;
259 
260         assert(u);
261         assert(nmemb <= SSIZE_MAX / size);
262 
263         check_update_watchdog(u);
264 
265         j = u->journal;
266 
267         while (j && filled < size * nmemb) {
268                 if (u->entry_state == ENTRY_DONE) {
269                         r = sd_journal_next(j);
270                         if (r < 0) {
271                                 log_error_errno(r, "Failed to move to next entry in journal: %m");
272                                 return CURL_READFUNC_ABORT;
273                         } else if (r == 0) {
274                                 if (u->input_event)
275                                         log_debug("No more entries, waiting for journal.");
276                                 else {
277                                         log_info("No more entries, closing journal.");
278                                         close_journal_input(u);
279                                 }
280 
281                                 u->uploading = false;
282 
283                                 break;
284                         }
285 
286                         u->entry_state = ENTRY_CURSOR;
287                 }
288 
289                 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
290                 if (w < 0)
291                         return CURL_READFUNC_ABORT;
292                 filled += w;
293 
294                 if (filled == 0) {
295                         log_error("Buffer space is too small to write entry.");
296                         return CURL_READFUNC_ABORT;
297                 } else if (u->entry_state != ENTRY_DONE)
298                         /* This means that all available space was used up */
299                         break;
300 
301                 log_debug("Entry %zu (%s) has been uploaded.",
302                           u->entries_sent, u->current_cursor);
303         }
304 
305         return filled;
306 }
307 
close_journal_input(Uploader * u)308 void close_journal_input(Uploader *u) {
309         assert(u);
310 
311         if (u->journal) {
312                 log_debug("Closing journal input.");
313 
314                 sd_journal_close(u->journal);
315                 u->journal = NULL;
316         }
317         u->timeout = 0;
318 }
319 
process_journal_input(Uploader * u,int skip)320 static int process_journal_input(Uploader *u, int skip) {
321         int r;
322 
323         if (u->uploading)
324                 return 0;
325 
326         r = sd_journal_next_skip(u->journal, skip);
327         if (r < 0)
328                 return log_error_errno(r, "Failed to skip to next entry: %m");
329         else if (r < skip)
330                 return 0;
331 
332         /* have data */
333         u->entry_state = ENTRY_CURSOR;
334         return start_upload(u, journal_input_callback, u);
335 }
336 
check_journal_input(Uploader * u)337 int check_journal_input(Uploader *u) {
338         if (u->input_event) {
339                 int r;
340 
341                 r = sd_journal_process(u->journal);
342                 if (r < 0) {
343                         log_error_errno(r, "Failed to process journal: %m");
344                         close_journal_input(u);
345                         return r;
346                 }
347 
348                 if (r == SD_JOURNAL_NOP)
349                         return 0;
350         }
351 
352         return process_journal_input(u, 1);
353 }
354 
dispatch_journal_input(sd_event_source * event,int fd,uint32_t revents,void * userp)355 static int dispatch_journal_input(sd_event_source *event,
356                                   int fd,
357                                   uint32_t revents,
358                                   void *userp) {
359         Uploader *u = userp;
360 
361         assert(u);
362 
363         if (u->uploading)
364                 return 0;
365 
366         log_debug("Detected journal input, checking for new data.");
367         return check_journal_input(u);
368 }
369 
open_journal_for_upload(Uploader * u,sd_journal * j,const char * cursor,bool after_cursor,bool follow)370 int open_journal_for_upload(Uploader *u,
371                             sd_journal *j,
372                             const char *cursor,
373                             bool after_cursor,
374                             bool follow) {
375         int fd, r, events;
376 
377         u->journal = j;
378 
379         sd_journal_set_data_threshold(j, 0);
380 
381         if (follow) {
382                 fd = sd_journal_get_fd(j);
383                 if (fd < 0)
384                         return log_error_errno(fd, "sd_journal_get_fd failed: %m");
385 
386                 events = sd_journal_get_events(j);
387 
388                 r = sd_journal_reliable_fd(j);
389                 assert(r >= 0);
390                 if (r > 0)
391                         u->timeout = -1;
392                 else
393                         u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
394 
395                 r = sd_event_add_io(u->events, &u->input_event,
396                                     fd, events, dispatch_journal_input, u);
397                 if (r < 0)
398                         return log_error_errno(r, "Failed to register input event: %m");
399 
400                 log_debug("Listening for journal events on fd:%d, timeout %d",
401                           fd, u->timeout == UINT64_MAX ? -1 : (int) u->timeout);
402         } else
403                 log_debug("Not listening for journal events.");
404 
405         if (cursor) {
406                 r = sd_journal_seek_cursor(j, cursor);
407                 if (r < 0)
408                         return log_error_errno(r, "Failed to seek to cursor %s: %m",
409                                                cursor);
410         }
411 
412         return process_journal_input(u, !!after_cursor);
413 }
414