1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdbool.h>
6 #include <stdint.h>
7 #include <stdlib.h>
8 #include <sys/eventfd.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11
12 #include "barrier.h"
13 #include "errno-util.h"
14 #include "fd-util.h"
15 #include "io-util.h"
16 #include "macro.h"
17
18 /**
19 * Barriers
20 * This barrier implementation provides a simple synchronization method based
21 * on file-descriptors that can safely be used between threads and processes. A
22 * barrier object contains 2 shared counters based on eventfd. Both processes
23 * can now place barriers and wait for the other end to reach a random or
24 * specific barrier.
25 * Barriers are numbered, so you can either wait for the other end to reach any
26 * barrier or the last barrier that you placed. This way, you can use barriers
27 * for one-way *and* full synchronization. Note that even-though barriers are
28 * numbered, these numbers are internal and recycled once both sides reached the
29 * same barrier (implemented as a simple signed counter). It is thus not
30 * possible to address barriers by their ID.
31 *
32 * Barrier-API: Both ends can place as many barriers via barrier_place() as
33 * they want and each pair of barriers on both sides will be implicitly linked.
34 * Each side can use the barrier_wait/sync_*() family of calls to wait for the
35 * other side to place a specific barrier. barrier_wait_next() waits until the
36 * other side calls barrier_place(). No links between the barriers are
37 * considered and this simply serves as most basic asynchronous barrier.
38 * barrier_sync_next() is like barrier_wait_next() and waits for the other side
39 * to place their next barrier via barrier_place(). However, it only waits for
40 * barriers that are linked to a barrier we already placed. If the other side
41 * already placed more barriers than we did, barrier_sync_next() returns
42 * immediately.
43 * barrier_sync() extends barrier_sync_next() and waits until the other end
44 * placed as many barriers via barrier_place() as we did. If they already placed
45 * as many as we did (or more), it returns immediately.
46 *
47 * Additionally to basic barriers, an abortion event is available.
48 * barrier_abort() places an abortion event that cannot be undone. An abortion
49 * immediately cancels all placed barriers and replaces them. Any running and
50 * following wait/sync call besides barrier_wait_abortion() will immediately
51 * return false on both sides (otherwise, they always return true).
52 * barrier_abort() can be called multiple times on both ends and will be a
53 * no-op if already called on this side.
54 * barrier_wait_abortion() can be used to wait for the other side to call
55 * barrier_abort() and is the only wait/sync call that does not return
56 * immediately if we aborted outself. It only returns once the other side
57 * called barrier_abort().
58 *
59 * Barriers can be used for in-process and inter-process synchronization.
60 * However, for in-process synchronization you could just use mutexes.
61 * Therefore, main target is IPC and we require both sides to *not* share the FD
62 * table. If that's given, barriers provide target tracking: If the remote side
63 * exit()s, an abortion event is implicitly queued on the other side. This way,
64 * a sync/wait call will be woken up if the remote side crashed or exited
65 * unexpectedly. However, note that these abortion events are only queued if the
66 * barrier-queue has been drained. Therefore, it is safe to place a barrier and
67 * exit. The other side can safely wait on the barrier even though the exit
68 * queued an abortion event. Usually, the abortion event would overwrite the
69 * barrier, however, that's not true for exit-abortion events. Those are only
70 * queued if the barrier-queue is drained (thus, the receiving side has placed
71 * more barriers than the remote side).
72 */
73
74 /**
75 * barrier_create() - Initialize a barrier object
76 * @obj: barrier to initialize
77 *
78 * This initializes a barrier object. The caller is responsible of allocating
79 * the memory and keeping it valid. The memory does not have to be zeroed
80 * beforehand.
81 * Two eventfd objects are allocated for each barrier. If allocation fails, an
82 * error is returned.
83 *
84 * If this function fails, the barrier is reset to an invalid state so it is
85 * safe to call barrier_destroy() on the object regardless whether the
86 * initialization succeeded or not.
87 *
88 * The caller is responsible to destroy the object via barrier_destroy() before
89 * releasing the underlying memory.
90 *
91 * Returns: 0 on success, negative error code on failure.
92 */
barrier_create(Barrier * b)93 int barrier_create(Barrier *b) {
94 _unused_ _cleanup_(barrier_destroyp) Barrier *staging = b;
95 int r;
96
97 assert(b);
98
99 b->me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
100 if (b->me < 0)
101 return -errno;
102
103 b->them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
104 if (b->them < 0)
105 return -errno;
106
107 r = pipe2(b->pipe, O_CLOEXEC | O_NONBLOCK);
108 if (r < 0)
109 return -errno;
110
111 staging = NULL;
112 return 0;
113 }
114
115 /**
116 * barrier_destroy() - Destroy a barrier object
117 * @b: barrier to destroy or NULL
118 *
119 * This destroys a barrier object that has previously been passed to
120 * barrier_create(). The object is released and reset to invalid
121 * state. Therefore, it is safe to call barrier_destroy() multiple
122 * times or even if barrier_create() failed. However, barrier must be
123 * always initialized with BARRIER_NULL.
124 *
125 * If @b is NULL, this is a no-op.
126 */
barrier_destroy(Barrier * b)127 Barrier* barrier_destroy(Barrier *b) {
128 if (!b)
129 return NULL;
130
131 b->me = safe_close(b->me);
132 b->them = safe_close(b->them);
133 safe_close_pair(b->pipe);
134 b->barriers = 0;
135 return NULL;
136 }
137
138 /**
139 * barrier_set_role() - Set the local role of the barrier
140 * @b: barrier to operate on
141 * @role: role to set on the barrier
142 *
143 * This sets the roles on a barrier object. This is needed to know
144 * which side of the barrier you're on. Usually, the parent creates
145 * the barrier via barrier_create() and then calls fork() or clone().
146 * Therefore, the FDs are duplicated and the child retains the same
147 * barrier object.
148 *
149 * Both sides need to call barrier_set_role() after fork() or clone()
150 * are done. If this is not done, barriers will not work correctly.
151 *
152 * Note that barriers could be supported without fork() or clone(). However,
153 * this is currently not needed so it hasn't been implemented.
154 */
barrier_set_role(Barrier * b,unsigned role)155 void barrier_set_role(Barrier *b, unsigned role) {
156 assert(b);
157 assert(IN_SET(role, BARRIER_PARENT, BARRIER_CHILD));
158 /* make sure this is only called once */
159 assert(b->pipe[0] >= 0 && b->pipe[1] >= 0);
160
161 if (role == BARRIER_PARENT)
162 b->pipe[1] = safe_close(b->pipe[1]);
163 else {
164 b->pipe[0] = safe_close(b->pipe[0]);
165
166 /* swap me/them for children */
167 SWAP_TWO(b->me, b->them);
168 }
169 }
170
171 /* places barrier; returns false if we aborted, otherwise true */
barrier_write(Barrier * b,uint64_t buf)172 static bool barrier_write(Barrier *b, uint64_t buf) {
173 ssize_t len;
174
175 /* prevent new sync-points if we already aborted */
176 if (barrier_i_aborted(b))
177 return false;
178
179 assert(b->me >= 0);
180 do {
181 len = write(b->me, &buf, sizeof(buf));
182 } while (len < 0 && ERRNO_IS_TRANSIENT(errno));
183
184 if (len != sizeof(buf))
185 goto error;
186
187 /* lock if we aborted */
188 if (buf >= (uint64_t)BARRIER_ABORTION) {
189 if (barrier_they_aborted(b))
190 b->barriers = BARRIER_WE_ABORTED;
191 else
192 b->barriers = BARRIER_I_ABORTED;
193 } else if (!barrier_is_aborted(b))
194 b->barriers += buf;
195
196 return !barrier_i_aborted(b);
197
198 error:
199 /* If there is an unexpected error, we have to make this fatal. There
200 * is no way we can recover from sync-errors. Therefore, we close the
201 * pipe-ends and treat this as abortion. The other end will notice the
202 * pipe-close and treat it as abortion, too. */
203
204 safe_close_pair(b->pipe);
205 b->barriers = BARRIER_WE_ABORTED;
206 return false;
207 }
208
209 /* waits for barriers; returns false if they aborted, otherwise true */
barrier_read(Barrier * b,int64_t comp)210 static bool barrier_read(Barrier *b, int64_t comp) {
211 if (barrier_they_aborted(b))
212 return false;
213
214 while (b->barriers > comp) {
215 struct pollfd pfd[2] = {
216 { .fd = b->pipe[0] >= 0 ? b->pipe[0] : b->pipe[1],
217 .events = POLLHUP },
218 { .fd = b->them,
219 .events = POLLIN }};
220 uint64_t buf;
221 int r;
222
223 r = ppoll_usec(pfd, ELEMENTSOF(pfd), USEC_INFINITY);
224 if (r == -EINTR)
225 continue;
226 if (r < 0)
227 goto error;
228
229 if (pfd[1].revents) {
230 ssize_t len;
231
232 /* events on @them signal new data for us */
233 len = read(b->them, &buf, sizeof(buf));
234 if (len < 0 && ERRNO_IS_TRANSIENT(errno))
235 continue;
236
237 if (len != sizeof(buf))
238 goto error;
239 } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL))
240 /* POLLHUP on the pipe tells us the other side exited.
241 * We treat this as implicit abortion. But we only
242 * handle it if there's no event on the eventfd. This
243 * guarantees that exit-abortions do not overwrite real
244 * barriers. */
245 buf = BARRIER_ABORTION;
246 else
247 continue;
248
249 /* lock if they aborted */
250 if (buf >= (uint64_t)BARRIER_ABORTION) {
251 if (barrier_i_aborted(b))
252 b->barriers = BARRIER_WE_ABORTED;
253 else
254 b->barriers = BARRIER_THEY_ABORTED;
255 } else if (!barrier_is_aborted(b))
256 b->barriers -= buf;
257 }
258
259 return !barrier_they_aborted(b);
260
261 error:
262 /* If there is an unexpected error, we have to make this fatal. There
263 * is no way we can recover from sync-errors. Therefore, we close the
264 * pipe-ends and treat this as abortion. The other end will notice the
265 * pipe-close and treat it as abortion, too. */
266
267 safe_close_pair(b->pipe);
268 b->barriers = BARRIER_WE_ABORTED;
269 return false;
270 }
271
272 /**
273 * barrier_place() - Place a new barrier
274 * @b: barrier object
275 *
276 * This places a new barrier on the barrier object. If either side already
277 * aborted, this is a no-op and returns "false". Otherwise, the barrier is
278 * placed and this returns "true".
279 *
280 * Returns: true if barrier was placed, false if either side aborted.
281 */
barrier_place(Barrier * b)282 bool barrier_place(Barrier *b) {
283 assert(b);
284
285 if (barrier_is_aborted(b))
286 return false;
287
288 barrier_write(b, BARRIER_SINGLE);
289 return true;
290 }
291
292 /**
293 * barrier_abort() - Abort the synchronization
294 * @b: barrier object to abort
295 *
296 * This aborts the barrier-synchronization. If barrier_abort() was already
297 * called on this side, this is a no-op. Otherwise, the barrier is put into the
298 * ABORT-state and will stay there. The other side is notified about the
299 * abortion. Any following attempt to place normal barriers or to wait on normal
300 * barriers will return immediately as "false".
301 *
302 * You can wait for the other side to call barrier_abort(), too. Use
303 * barrier_wait_abortion() for that.
304 *
305 * Returns: false if the other side already aborted, true otherwise.
306 */
barrier_abort(Barrier * b)307 bool barrier_abort(Barrier *b) {
308 assert(b);
309
310 barrier_write(b, BARRIER_ABORTION);
311 return !barrier_they_aborted(b);
312 }
313
314 /**
315 * barrier_wait_next() - Wait for the next barrier of the other side
316 * @b: barrier to operate on
317 *
318 * This waits until the other side places its next barrier. This is independent
319 * of any barrier-links and just waits for any next barrier of the other side.
320 *
321 * If either side aborted, this returns false.
322 *
323 * Returns: false if either side aborted, true otherwise.
324 */
barrier_wait_next(Barrier * b)325 bool barrier_wait_next(Barrier *b) {
326 assert(b);
327
328 if (barrier_is_aborted(b))
329 return false;
330
331 barrier_read(b, b->barriers - 1);
332 return !barrier_is_aborted(b);
333 }
334
335 /**
336 * barrier_wait_abortion() - Wait for the other side to abort
337 * @b: barrier to operate on
338 *
339 * This waits until the other side called barrier_abort(). This can be called
340 * regardless whether the local side already called barrier_abort() or not.
341 *
342 * If the other side has already aborted, this returns immediately.
343 *
344 * Returns: false if the local side aborted, true otherwise.
345 */
barrier_wait_abortion(Barrier * b)346 bool barrier_wait_abortion(Barrier *b) {
347 assert(b);
348
349 barrier_read(b, BARRIER_THEY_ABORTED);
350 return !barrier_i_aborted(b);
351 }
352
353 /**
354 * barrier_sync_next() - Wait for the other side to place a next linked barrier
355 * @b: barrier to operate on
356 *
357 * This is like barrier_wait_next() and waits for the other side to call
358 * barrier_place(). However, this only waits for linked barriers. That means, if
359 * the other side already placed more barriers than (or as much as) we did, this
360 * returns immediately instead of waiting.
361 *
362 * If either side aborted, this returns false.
363 *
364 * Returns: false if either side aborted, true otherwise.
365 */
barrier_sync_next(Barrier * b)366 bool barrier_sync_next(Barrier *b) {
367 assert(b);
368
369 if (barrier_is_aborted(b))
370 return false;
371
372 barrier_read(b, MAX((int64_t)0, b->barriers - 1));
373 return !barrier_is_aborted(b);
374 }
375
376 /**
377 * barrier_sync() - Wait for the other side to place as many barriers as we did
378 * @b: barrier to operate on
379 *
380 * This is like barrier_sync_next() but waits for the other side to call
381 * barrier_place() as often as we did (in total). If they already placed as much
382 * as we did (or more), this returns immediately instead of waiting.
383 *
384 * If either side aborted, this returns false.
385 *
386 * Returns: false if either side aborted, true otherwise.
387 */
barrier_sync(Barrier * b)388 bool barrier_sync(Barrier *b) {
389 assert(b);
390
391 if (barrier_is_aborted(b))
392 return false;
393
394 barrier_read(b, 0);
395 return !barrier_is_aborted(b);
396 }
397