1 extern crate libc;
2 extern crate syscalls;
3
4 use std::{
5 ffi::c_void,
6 mem::{self, size_of},
7 process,
8 ptr::{self, NonNull},
9 sync::atomic::{AtomicI32, Ordering},
10 thread,
11 time::Duration,
12 };
13
14 use syscalls::{
15 syscall0, syscall2, syscall3, syscall6,
16 Sysno::{futex, get_robust_list, gettid, set_robust_list},
17 };
18
19 use libc::{
20 c_int, mmap, perror, EXIT_FAILURE, MAP_ANONYMOUS, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE,
21 };
22
23 const FUTEX_WAIT: usize = 0;
24 const FUTEX_WAKE: usize = 1;
25
26 // 封装futex
27 #[derive(Clone, Copy, Debug)]
28 struct Futex {
29 addr: *mut u32,
30 }
31
32 impl Futex {
new(addr: *mut u32) -> Self33 pub fn new(addr: *mut u32) -> Self {
34 return Futex { addr };
35 }
36
get_addr(&self, offset: isize) -> *mut u3237 pub fn get_addr(&self, offset: isize) -> *mut u32 {
38 return unsafe { self.addr.offset(offset) };
39 }
40
get_val(&self, offset: isize) -> u3241 pub fn get_val(&self, offset: isize) -> u32 {
42 return unsafe { self.addr.offset(offset).read() };
43 }
44
set_val(&self, val: u32, offset: isize)45 pub fn set_val(&self, val: u32, offset: isize) {
46 unsafe {
47 self.addr.offset(offset).write(val);
48 }
49 }
50 }
51
52 unsafe impl Send for Futex {}
53 unsafe impl Sync for Futex {}
54
55 #[derive(Clone, Copy, Debug)]
56 struct Lock {
57 addr: *mut i32,
58 }
59
60 impl Lock {
new(addr: *mut i32) -> Self61 pub fn new(addr: *mut i32) -> Self {
62 return Lock { addr };
63 }
64
get_val(&self, offset: isize) -> i3265 pub fn get_val(&self, offset: isize) -> i32 {
66 return unsafe { self.addr.offset(offset).read() };
67 }
68
set_val(&self, val: i32, offset: isize)69 pub fn set_val(&self, val: i32, offset: isize) {
70 unsafe {
71 self.addr.offset(offset).write(val);
72 }
73 }
74 }
75
76 unsafe impl Send for Lock {}
77 unsafe impl Sync for Lock {}
78
79 #[derive(Debug, Clone, Copy)]
80 struct RobustList {
81 next: *const RobustList,
82 }
83
84 #[derive(Debug, Clone, Copy)]
85 struct RobustListHead {
86 list: RobustList,
87 /// 向kernel提供了要检查的futex字段的相对位置,保持用户空间的灵活性,可以自由
88 /// 地调整其数据结构,而无需向内核硬编码任何特定的偏移量
89 /// futexes中前面的地址是用来存入robust list中(list.next),后面是存放具体的futex val
90 /// 这个字段的作用就是从前面的地址偏移到后面的地址中从而获取futex val
91 #[allow(dead_code)]
92 futex_offset: isize,
93 /// 潜在的竞争条件:由于添加和删除列表是在获取锁之后进行的,这給线程留下了一个小窗口,在此期间可能会导致异常退出,
94 /// 从而使锁被悬挂,为了防止这种可能性。用户空间还维护了一个简单的list_op_pending字段,允许线程在获取锁后但还未添加到
95 /// 列表时就异常退出时进行清理。并且在完成列表添加或删除操作后将其清除
96 /// 这里没有测试这个,在内核中实现实际上就是把list_op_pending地址进行一次唤醒(如果有等待者)
97 #[allow(dead_code)]
98 list_op_pending: *const RobustList,
99 }
100
error_handle(msg: &str) -> !101 fn error_handle(msg: &str) -> ! {
102 unsafe { perror(msg.as_ptr() as *const i8) };
103 process::exit(EXIT_FAILURE)
104 }
105
futex_wait(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize)106 fn futex_wait(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize) {
107 loop {
108 let atomic_count = AtomicI32::new(lock.get_val(offset_count));
109 if atomic_count
110 .compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst)
111 .is_ok()
112 {
113 lock.set_val(0, offset_count);
114
115 // 设置futex锁当前被哪个线程占用
116 let tid = unsafe { syscall0(gettid).unwrap() as u32 };
117 futexes.set_val(futexes.get_val(offset_futex) | tid, offset_futex);
118
119 break;
120 }
121
122 println!("{} wating...", thread);
123 let futex_val = futexes.get_val(offset_futex);
124 futexes.set_val(futex_val | 0x8000_0000, offset_futex);
125 let ret = unsafe {
126 syscall6(
127 futex,
128 futexes.get_addr(offset_futex) as usize,
129 FUTEX_WAIT,
130 futexes.get_val(offset_futex) as usize,
131 0,
132 0,
133 0,
134 )
135 };
136 if ret.is_err() {
137 error_handle("futex_wait failed");
138 }
139
140 // 被唤醒后释放锁
141 let atomic_count = AtomicI32::new(lock.get_val(offset_count));
142 if atomic_count
143 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
144 .is_ok()
145 {
146 lock.set_val(1, offset_count);
147
148 // 释放futex锁,不被任何线程占用
149 futexes.set_val(futexes.get_val(offset_futex) & 0xc000_0000, offset_futex);
150
151 break;
152 }
153 }
154 }
155
futex_wake(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize)156 fn futex_wake(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize) {
157 let atomic_count = AtomicI32::new(lock.get_val(offset_count));
158 if atomic_count
159 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
160 .is_ok()
161 {
162 lock.set_val(1, offset_count);
163
164 // 释放futex锁,不被任何线程占用
165 futexes.set_val(futexes.get_val(offset_futex) & 0xc000_0000, offset_futex);
166
167 // 如果没有线程/进程在等这个futex,则不必唤醒, 释放改锁即可
168 let futex_val = futexes.get_val(offset_futex);
169 if futex_val & 0x8000_0000 == 0 {
170 return;
171 }
172
173 futexes.set_val(futex_val & !(1 << 31), offset_futex);
174 let ret = unsafe {
175 syscall6(
176 futex,
177 futexes.get_addr(offset_futex) as usize,
178 FUTEX_WAKE,
179 1,
180 0,
181 0,
182 0,
183 )
184 };
185 if ret.is_err() {
186 error_handle("futex wake failed");
187 }
188 println!("{} waked", thread);
189 }
190 }
191
set_list(futexes: Futex)192 fn set_list(futexes: Futex) {
193 let head = RobustListHead {
194 list: RobustList { next: ptr::null() },
195 futex_offset: 44,
196 list_op_pending: ptr::null(),
197 };
198 let head = NonNull::from(&head).as_ptr();
199 unsafe {
200 // 加入第一个futex
201 let head_ref_mut = &mut *head;
202 head_ref_mut.list.next = futexes.get_addr(0) as *const RobustList;
203
204 // 加入第二个futex
205 let list_2 = NonNull::from(&*head_ref_mut.list.next).as_ptr();
206 let list_2_ref_mut = &mut *list_2;
207 list_2_ref_mut.next = futexes.get_addr(1) as *const RobustList;
208
209 //println!("robust list next: {:?}", (*head).list.next );
210 //println!("robust list next next: {:?}", (*(*head).list.next).next );
211
212 // 向内核注册robust list
213 let len = mem::size_of::<*mut RobustListHead>();
214 let ret = syscall2(set_robust_list, head as usize, len);
215 if ret.is_err() {
216 println!("failed to set_robust_list, ret = {:?}", ret);
217 }
218 }
219 }
220
main()221 fn main() {
222 test01();
223
224 println!("-------------");
225
226 test02();
227
228 println!("-------------");
229 }
230
231 //测试set_robust_list和get_robust_list两个系统调用是否能正常使用
test01()232 fn test01() {
233 // 创建robust list 头指针
234 let head = RobustListHead {
235 list: RobustList { next: ptr::null() },
236 futex_offset: 8,
237 list_op_pending: ptr::null(),
238 };
239 let head = NonNull::from(&head).as_ptr();
240
241 let futexes = unsafe {
242 mmap(
243 ptr::null_mut::<c_void>(),
244 (size_of::<c_int>() * 2) as libc::size_t,
245 PROT_READ | PROT_WRITE,
246 MAP_ANONYMOUS | MAP_SHARED,
247 -1,
248 0,
249 ) as *mut u32
250 };
251 if futexes == MAP_FAILED as *mut u32 {
252 error_handle("futexes_addr mmap failed");
253 }
254
255 unsafe {
256 futexes.offset(11).write(0x0000_0000);
257 futexes.offset(12).write(0x8000_0000);
258 println!("futex1 next addr: {:?}", futexes.offset(0));
259 println!("futex2 next addr: {:?}", futexes.offset(1));
260 println!("futex1 val addr: {:?}", futexes.offset(11));
261 println!("futex2 val addr: {:?}", futexes.offset(12));
262 println!("futex1 val: {:#x?}", futexes.offset(11).read());
263 println!("futex2 val: {:#x?}", futexes.offset(12).read());
264 }
265
266 // 打印注册之前的robust list
267 println!("robust list next(get behind): {:?}", &unsafe { *head });
268
269 unsafe {
270 let head_ref_mut = &mut *head;
271 head_ref_mut.list.next = futexes.offset(0) as *const RobustList;
272 let list_2 = NonNull::from(&*head_ref_mut.list.next).as_ptr();
273 let list_2_ref_mut = &mut *list_2;
274 list_2_ref_mut.next = futexes.offset(1) as *const RobustList;
275 println!("robust list next addr: {:?}", (*head).list.next);
276 println!(
277 "robust list next next addr: {:?}",
278 (*(*head).list.next).next
279 );
280 }
281
282 unsafe {
283 let len = mem::size_of::<*mut RobustListHead>();
284 let ret = syscall2(set_robust_list, head as usize, len);
285 if ret.is_err() {
286 println!("failed to set_robust_list, ret = {:?}", ret);
287 }
288 }
289
290 println!("get before, set after: {:?}", head);
291 println!("get before, set after: {:?}", &unsafe { *head });
292 unsafe {
293 let len: usize = 0;
294 println!("len = {}", len);
295 let len_ptr = NonNull::from(&len).as_ptr();
296 let ret = syscall3(get_robust_list, 0, head as usize, len_ptr as usize);
297 println!("get len = {}", len);
298 if ret.is_err() {
299 println!("failed to get_robust_list, ret = {:?}", ret);
300 }
301
302 println!("futex1 val: {:#x}", futexes.offset(11).read());
303 println!("futex2 val: {:#x}", futexes.offset(12).read());
304 println!("robust list next: {:?}", futexes.offset(0));
305 println!("robust list next next: {:#x?}", futexes.offset(0).read());
306 }
307 println!("robust list head(get after): {:?}", head);
308 println!("robust list next(get after): {:?}", &unsafe { *head });
309 }
310
311 //测试一个线程异常退出时futex的robustness(多线程测试,目前futex还不支持多进程)
test02()312 fn test02() {
313 let futexes = unsafe {
314 mmap(
315 ptr::null_mut::<c_void>(),
316 (size_of::<c_int>() * 2) as libc::size_t,
317 PROT_READ | PROT_WRITE,
318 MAP_ANONYMOUS | MAP_SHARED,
319 -1,
320 0,
321 ) as *mut u32
322 };
323 if futexes == MAP_FAILED as *mut u32 {
324 error_handle("mmap failed");
325 }
326 let count = unsafe {
327 mmap(
328 ptr::null_mut::<c_void>(),
329 (size_of::<c_int>() * 2) as libc::size_t,
330 PROT_READ | PROT_WRITE,
331 MAP_ANONYMOUS | MAP_SHARED,
332 -1,
333 0,
334 ) as *mut i32
335 };
336 if count == MAP_FAILED as *mut i32 {
337 error_handle("mmap failed");
338 }
339
340 unsafe {
341 // 在这个示例中,第一段和第二段地址放入robust list,第11段地址和第12段地址存放futex val
342 futexes.offset(11).write(0x0000_0000);
343 futexes.offset(12).write(0x0000_0000);
344 println!("futex1 next addr: {:?}", futexes.offset(0));
345 println!("futex2 next addr: {:?}", futexes.offset(1));
346 println!("futex1 val addr: {:?}", futexes.offset(11));
347 println!("futex2 val addr: {:?}", futexes.offset(12));
348 println!("futex1 val: {:#x?}", futexes.offset(11).read());
349 println!("futex2 val: {:#x?}", futexes.offset(12).read());
350
351 count.offset(0).write(1);
352 count.offset(1).write(0);
353 println!("count1 val: {:?}", count.offset(0).read());
354 println!("count2 val: {:?}", count.offset(1).read());
355 }
356
357 let futexes = Futex::new(futexes);
358 let locks = Lock::new(count);
359
360 // tid1 = 7
361 let thread1 = thread::spawn(move || {
362 set_list(futexes);
363 thread::sleep(Duration::from_secs(2));
364 for i in 0..2 {
365 futex_wait(futexes, "thread1", 11, locks, 0);
366 println!("thread1 times: {}", i);
367 thread::sleep(Duration::from_secs(3));
368
369 let tid = unsafe { syscall0(gettid).unwrap() as u32 };
370 futexes.set_val(futexes.get_val(12) | tid, 12);
371
372 if i == 1 {
373 // 让thread1异常退出,从而无法唤醒thread2,检测robustness
374 println!("Thread1 exiting early due to simulated error.");
375 return;
376 }
377 futex_wake(futexes, "thread2", 12, locks, 1);
378 }
379 });
380
381 // tid2 = 6
382 set_list(futexes);
383 for i in 0..2 {
384 futex_wait(futexes, "thread2", 12, locks, 1);
385 println!("thread2 times: {}", i);
386
387 let tid = unsafe { syscall0(gettid).unwrap() as u32 };
388 futexes.set_val(futexes.get_val(11) | tid, 11);
389
390 futex_wake(futexes, "thread1", 11, locks, 0);
391 }
392
393 thread1.join().unwrap();
394 }
395