1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2 
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <stdbool.h>
6 #include <stdint.h>
7 #include <stdlib.h>
8 #include <sys/eventfd.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #include "barrier.h"
13 #include "errno-util.h"
14 #include "fd-util.h"
15 #include "io-util.h"
16 #include "macro.h"
17 
18 /**
19  * Barriers
20  * This barrier implementation provides a simple synchronization method based
21  * on file-descriptors that can safely be used between threads and processes. A
22  * barrier object contains 2 shared counters based on eventfd. Both processes
23  * can now place barriers and wait for the other end to reach a random or
24  * specific barrier.
25  * Barriers are numbered, so you can either wait for the other end to reach any
26  * barrier or the last barrier that you placed. This way, you can use barriers
27  * for one-way *and* full synchronization. Note that even-though barriers are
28  * numbered, these numbers are internal and recycled once both sides reached the
29  * same barrier (implemented as a simple signed counter). It is thus not
30  * possible to address barriers by their ID.
31  *
32  * Barrier-API: Both ends can place as many barriers via barrier_place() as
33  * they want and each pair of barriers on both sides will be implicitly linked.
34  * Each side can use the barrier_wait/sync_*() family of calls to wait for the
35  * other side to place a specific barrier. barrier_wait_next() waits until the
36  * other side calls barrier_place(). No links between the barriers are
37  * considered and this simply serves as most basic asynchronous barrier.
38  * barrier_sync_next() is like barrier_wait_next() and waits for the other side
39  * to place their next barrier via barrier_place(). However, it only waits for
40  * barriers that are linked to a barrier we already placed. If the other side
41  * already placed more barriers than we did, barrier_sync_next() returns
42  * immediately.
43  * barrier_sync() extends barrier_sync_next() and waits until the other end
44  * placed as many barriers via barrier_place() as we did. If they already placed
45  * as many as we did (or more), it returns immediately.
46  *
47  * Additionally to basic barriers, an abortion event is available.
48  * barrier_abort() places an abortion event that cannot be undone. An abortion
49  * immediately cancels all placed barriers and replaces them. Any running and
50  * following wait/sync call besides barrier_wait_abortion() will immediately
51  * return false on both sides (otherwise, they always return true).
52  * barrier_abort() can be called multiple times on both ends and will be a
53  * no-op if already called on this side.
54  * barrier_wait_abortion() can be used to wait for the other side to call
55  * barrier_abort() and is the only wait/sync call that does not return
56  * immediately if we aborted outself. It only returns once the other side
57  * called barrier_abort().
58  *
59  * Barriers can be used for in-process and inter-process synchronization.
60  * However, for in-process synchronization you could just use mutexes.
61  * Therefore, main target is IPC and we require both sides to *not* share the FD
62  * table. If that's given, barriers provide target tracking: If the remote side
63  * exit()s, an abortion event is implicitly queued on the other side. This way,
64  * a sync/wait call will be woken up if the remote side crashed or exited
65  * unexpectedly. However, note that these abortion events are only queued if the
66  * barrier-queue has been drained. Therefore, it is safe to place a barrier and
67  * exit. The other side can safely wait on the barrier even though the exit
68  * queued an abortion event. Usually, the abortion event would overwrite the
69  * barrier, however, that's not true for exit-abortion events. Those are only
70  * queued if the barrier-queue is drained (thus, the receiving side has placed
71  * more barriers than the remote side).
72  */
73 
74 /**
75  * barrier_create() - Initialize a barrier object
76  * @obj: barrier to initialize
77  *
78  * This initializes a barrier object. The caller is responsible of allocating
79  * the memory and keeping it valid. The memory does not have to be zeroed
80  * beforehand.
81  * Two eventfd objects are allocated for each barrier. If allocation fails, an
82  * error is returned.
83  *
84  * If this function fails, the barrier is reset to an invalid state so it is
85  * safe to call barrier_destroy() on the object regardless whether the
86  * initialization succeeded or not.
87  *
88  * The caller is responsible to destroy the object via barrier_destroy() before
89  * releasing the underlying memory.
90  *
91  * Returns: 0 on success, negative error code on failure.
92  */
barrier_create(Barrier * b)93 int barrier_create(Barrier *b) {
94         _unused_ _cleanup_(barrier_destroyp) Barrier *staging = b;
95         int r;
96 
97         assert(b);
98 
99         b->me = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
100         if (b->me < 0)
101                 return -errno;
102 
103         b->them = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
104         if (b->them < 0)
105                 return -errno;
106 
107         r = pipe2(b->pipe, O_CLOEXEC | O_NONBLOCK);
108         if (r < 0)
109                 return -errno;
110 
111         staging = NULL;
112         return 0;
113 }
114 
115 /**
116  * barrier_destroy() - Destroy a barrier object
117  * @b: barrier to destroy or NULL
118  *
119  * This destroys a barrier object that has previously been passed to
120  * barrier_create(). The object is released and reset to invalid
121  * state. Therefore, it is safe to call barrier_destroy() multiple
122  * times or even if barrier_create() failed. However, barrier must be
123  * always initialized with BARRIER_NULL.
124  *
125  * If @b is NULL, this is a no-op.
126  */
barrier_destroy(Barrier * b)127 Barrier* barrier_destroy(Barrier *b) {
128         if (!b)
129                 return NULL;
130 
131         b->me = safe_close(b->me);
132         b->them = safe_close(b->them);
133         safe_close_pair(b->pipe);
134         b->barriers = 0;
135         return NULL;
136 }
137 
138 /**
139  * barrier_set_role() - Set the local role of the barrier
140  * @b: barrier to operate on
141  * @role: role to set on the barrier
142  *
143  * This sets the roles on a barrier object. This is needed to know
144  * which side of the barrier you're on. Usually, the parent creates
145  * the barrier via barrier_create() and then calls fork() or clone().
146  * Therefore, the FDs are duplicated and the child retains the same
147  * barrier object.
148  *
149  * Both sides need to call barrier_set_role() after fork() or clone()
150  * are done. If this is not done, barriers will not work correctly.
151  *
152  * Note that barriers could be supported without fork() or clone(). However,
153  * this is currently not needed so it hasn't been implemented.
154  */
barrier_set_role(Barrier * b,unsigned role)155 void barrier_set_role(Barrier *b, unsigned role) {
156         assert(b);
157         assert(IN_SET(role, BARRIER_PARENT, BARRIER_CHILD));
158         /* make sure this is only called once */
159         assert(b->pipe[0] >= 0 && b->pipe[1] >= 0);
160 
161         if (role == BARRIER_PARENT)
162                 b->pipe[1] = safe_close(b->pipe[1]);
163         else {
164                 b->pipe[0] = safe_close(b->pipe[0]);
165 
166                 /* swap me/them for children */
167                 SWAP_TWO(b->me, b->them);
168         }
169 }
170 
171 /* places barrier; returns false if we aborted, otherwise true */
barrier_write(Barrier * b,uint64_t buf)172 static bool barrier_write(Barrier *b, uint64_t buf) {
173         ssize_t len;
174 
175         /* prevent new sync-points if we already aborted */
176         if (barrier_i_aborted(b))
177                 return false;
178 
179         assert(b->me >= 0);
180         do {
181                 len = write(b->me, &buf, sizeof(buf));
182         } while (len < 0 && ERRNO_IS_TRANSIENT(errno));
183 
184         if (len != sizeof(buf))
185                 goto error;
186 
187         /* lock if we aborted */
188         if (buf >= (uint64_t)BARRIER_ABORTION) {
189                 if (barrier_they_aborted(b))
190                         b->barriers = BARRIER_WE_ABORTED;
191                 else
192                         b->barriers = BARRIER_I_ABORTED;
193         } else if (!barrier_is_aborted(b))
194                 b->barriers += buf;
195 
196         return !barrier_i_aborted(b);
197 
198 error:
199         /* If there is an unexpected error, we have to make this fatal. There
200          * is no way we can recover from sync-errors. Therefore, we close the
201          * pipe-ends and treat this as abortion. The other end will notice the
202          * pipe-close and treat it as abortion, too. */
203 
204         safe_close_pair(b->pipe);
205         b->barriers = BARRIER_WE_ABORTED;
206         return false;
207 }
208 
209 /* waits for barriers; returns false if they aborted, otherwise true */
barrier_read(Barrier * b,int64_t comp)210 static bool barrier_read(Barrier *b, int64_t comp) {
211         if (barrier_they_aborted(b))
212                 return false;
213 
214         while (b->barriers > comp) {
215                 struct pollfd pfd[2] = {
216                         { .fd = b->pipe[0] >= 0 ? b->pipe[0] : b->pipe[1],
217                           .events = POLLHUP },
218                         { .fd = b->them,
219                           .events = POLLIN }};
220                 uint64_t buf;
221                 int r;
222 
223                 r = ppoll_usec(pfd, ELEMENTSOF(pfd), USEC_INFINITY);
224                 if (r == -EINTR)
225                         continue;
226                 if (r < 0)
227                         goto error;
228 
229                 if (pfd[1].revents) {
230                         ssize_t len;
231 
232                         /* events on @them signal new data for us */
233                         len = read(b->them, &buf, sizeof(buf));
234                         if (len < 0 && ERRNO_IS_TRANSIENT(errno))
235                                 continue;
236 
237                         if (len != sizeof(buf))
238                                 goto error;
239                 } else if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL))
240                         /* POLLHUP on the pipe tells us the other side exited.
241                          * We treat this as implicit abortion. But we only
242                          * handle it if there's no event on the eventfd. This
243                          * guarantees that exit-abortions do not overwrite real
244                          * barriers. */
245                         buf = BARRIER_ABORTION;
246                 else
247                         continue;
248 
249                 /* lock if they aborted */
250                 if (buf >= (uint64_t)BARRIER_ABORTION) {
251                         if (barrier_i_aborted(b))
252                                 b->barriers = BARRIER_WE_ABORTED;
253                         else
254                                 b->barriers = BARRIER_THEY_ABORTED;
255                 } else if (!barrier_is_aborted(b))
256                         b->barriers -= buf;
257         }
258 
259         return !barrier_they_aborted(b);
260 
261 error:
262         /* If there is an unexpected error, we have to make this fatal. There
263          * is no way we can recover from sync-errors. Therefore, we close the
264          * pipe-ends and treat this as abortion. The other end will notice the
265          * pipe-close and treat it as abortion, too. */
266 
267         safe_close_pair(b->pipe);
268         b->barriers = BARRIER_WE_ABORTED;
269         return false;
270 }
271 
272 /**
273  * barrier_place() - Place a new barrier
274  * @b: barrier object
275  *
276  * This places a new barrier on the barrier object. If either side already
277  * aborted, this is a no-op and returns "false". Otherwise, the barrier is
278  * placed and this returns "true".
279  *
280  * Returns: true if barrier was placed, false if either side aborted.
281  */
barrier_place(Barrier * b)282 bool barrier_place(Barrier *b) {
283         assert(b);
284 
285         if (barrier_is_aborted(b))
286                 return false;
287 
288         barrier_write(b, BARRIER_SINGLE);
289         return true;
290 }
291 
292 /**
293  * barrier_abort() - Abort the synchronization
294  * @b: barrier object to abort
295  *
296  * This aborts the barrier-synchronization. If barrier_abort() was already
297  * called on this side, this is a no-op. Otherwise, the barrier is put into the
298  * ABORT-state and will stay there. The other side is notified about the
299  * abortion. Any following attempt to place normal barriers or to wait on normal
300  * barriers will return immediately as "false".
301  *
302  * You can wait for the other side to call barrier_abort(), too. Use
303  * barrier_wait_abortion() for that.
304  *
305  * Returns: false if the other side already aborted, true otherwise.
306  */
barrier_abort(Barrier * b)307 bool barrier_abort(Barrier *b) {
308         assert(b);
309 
310         barrier_write(b, BARRIER_ABORTION);
311         return !barrier_they_aborted(b);
312 }
313 
314 /**
315  * barrier_wait_next() - Wait for the next barrier of the other side
316  * @b: barrier to operate on
317  *
318  * This waits until the other side places its next barrier. This is independent
319  * of any barrier-links and just waits for any next barrier of the other side.
320  *
321  * If either side aborted, this returns false.
322  *
323  * Returns: false if either side aborted, true otherwise.
324  */
barrier_wait_next(Barrier * b)325 bool barrier_wait_next(Barrier *b) {
326         assert(b);
327 
328         if (barrier_is_aborted(b))
329                 return false;
330 
331         barrier_read(b, b->barriers - 1);
332         return !barrier_is_aborted(b);
333 }
334 
335 /**
336  * barrier_wait_abortion() - Wait for the other side to abort
337  * @b: barrier to operate on
338  *
339  * This waits until the other side called barrier_abort(). This can be called
340  * regardless whether the local side already called barrier_abort() or not.
341  *
342  * If the other side has already aborted, this returns immediately.
343  *
344  * Returns: false if the local side aborted, true otherwise.
345  */
barrier_wait_abortion(Barrier * b)346 bool barrier_wait_abortion(Barrier *b) {
347         assert(b);
348 
349         barrier_read(b, BARRIER_THEY_ABORTED);
350         return !barrier_i_aborted(b);
351 }
352 
353 /**
354  * barrier_sync_next() - Wait for the other side to place a next linked barrier
355  * @b: barrier to operate on
356  *
357  * This is like barrier_wait_next() and waits for the other side to call
358  * barrier_place(). However, this only waits for linked barriers. That means, if
359  * the other side already placed more barriers than (or as much as) we did, this
360  * returns immediately instead of waiting.
361  *
362  * If either side aborted, this returns false.
363  *
364  * Returns: false if either side aborted, true otherwise.
365  */
barrier_sync_next(Barrier * b)366 bool barrier_sync_next(Barrier *b) {
367         assert(b);
368 
369         if (barrier_is_aborted(b))
370                 return false;
371 
372         barrier_read(b, MAX((int64_t)0, b->barriers - 1));
373         return !barrier_is_aborted(b);
374 }
375 
376 /**
377  * barrier_sync() - Wait for the other side to place as many barriers as we did
378  * @b: barrier to operate on
379  *
380  * This is like barrier_sync_next() but waits for the other side to call
381  * barrier_place() as often as we did (in total). If they already placed as much
382  * as we did (or more), this returns immediately instead of waiting.
383  *
384  * If either side aborted, this returns false.
385  *
386  * Returns: false if either side aborted, true otherwise.
387  */
barrier_sync(Barrier * b)388 bool barrier_sync(Barrier *b) {
389         assert(b);
390 
391         if (barrier_is_aborted(b))
392                 return false;
393 
394         barrier_read(b, 0);
395         return !barrier_is_aborted(b);
396 }
397