1 extern crate libc; 2 extern crate syscalls; 3 4 use std::{ 5 ffi::c_void, 6 mem::{self, size_of}, 7 process, 8 ptr::{self, NonNull}, 9 sync::atomic::{AtomicI32, Ordering}, 10 thread, 11 time::Duration, 12 }; 13 14 use syscalls::{ 15 syscall0, syscall2, syscall3, syscall6, 16 Sysno::{futex, get_robust_list, gettid, set_robust_list}, 17 }; 18 19 use libc::{ 20 c_int, mmap, perror, EXIT_FAILURE, MAP_ANONYMOUS, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, 21 }; 22 23 const FUTEX_WAIT: usize = 0; 24 const FUTEX_WAKE: usize = 1; 25 26 // 封装futex 27 #[derive(Clone, Copy, Debug)] 28 struct Futex { 29 addr: *mut u32, 30 } 31 32 impl Futex { 33 pub fn new(addr: *mut u32) -> Self { 34 return Futex { addr }; 35 } 36 37 pub fn get_addr(&self, offset: isize) -> *mut u32 { 38 return unsafe { self.addr.offset(offset) }; 39 } 40 41 pub fn get_val(&self, offset: isize) -> u32 { 42 return unsafe { self.addr.offset(offset).read() }; 43 } 44 45 pub fn set_val(&self, val: u32, offset: isize) { 46 unsafe { 47 self.addr.offset(offset).write(val); 48 } 49 } 50 } 51 52 unsafe impl Send for Futex {} 53 unsafe impl Sync for Futex {} 54 55 #[derive(Clone, Copy, Debug)] 56 struct Lock { 57 addr: *mut i32, 58 } 59 60 impl Lock { 61 pub fn new(addr: *mut i32) -> Self { 62 return Lock { addr }; 63 } 64 65 pub fn get_val(&self, offset: isize) -> i32 { 66 return unsafe { self.addr.offset(offset).read() }; 67 } 68 69 pub fn set_val(&self, val: i32, offset: isize) { 70 unsafe { 71 self.addr.offset(offset).write(val); 72 } 73 } 74 } 75 76 unsafe impl Send for Lock {} 77 unsafe impl Sync for Lock {} 78 79 #[derive(Debug, Clone, Copy)] 80 struct RobustList { 81 next: *const RobustList, 82 } 83 84 #[derive(Debug, Clone, Copy)] 85 struct RobustListHead { 86 list: RobustList, 87 /// 向kernel提供了要检查的futex字段的相对位置,保持用户空间的灵活性,可以自由 88 /// 地调整其数据结构,而无需向内核硬编码任何特定的偏移量 89 /// futexes中前面的地址是用来存入robust list中(list.next),后面是存放具体的futex val 90 /// 这个字段的作用就是从前面的地址偏移到后面的地址中从而获取futex val 91 #[allow(dead_code)] 92 futex_offset: isize, 93 /// 潜在的竞争条件:由于添加和删除列表是在获取锁之后进行的,这給线程留下了一个小窗口,在此期间可能会导致异常退出, 94 /// 从而使锁被悬挂,为了防止这种可能性。用户空间还维护了一个简单的list_op_pending字段,允许线程在获取锁后但还未添加到 95 /// 列表时就异常退出时进行清理。并且在完成列表添加或删除操作后将其清除 96 /// 这里没有测试这个,在内核中实现实际上就是把list_op_pending地址进行一次唤醒(如果有等待者) 97 #[allow(dead_code)] 98 list_op_pending: *const RobustList, 99 } 100 101 fn error_handle(msg: &str) -> ! { 102 unsafe { perror(msg.as_ptr() as *const i8) }; 103 process::exit(EXIT_FAILURE) 104 } 105 106 fn futex_wait(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize) { 107 loop { 108 let atomic_count = AtomicI32::new(lock.get_val(offset_count)); 109 if atomic_count 110 .compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst) 111 .is_ok() 112 { 113 lock.set_val(0, offset_count); 114 115 // 设置futex锁当前被哪个线程占用 116 let tid = unsafe { syscall0(gettid).unwrap() as u32 }; 117 futexes.set_val(futexes.get_val(offset_futex) | tid, offset_futex); 118 119 break; 120 } 121 122 println!("{} wating...", thread); 123 let futex_val = futexes.get_val(offset_futex); 124 futexes.set_val(futex_val | 0x8000_0000, offset_futex); 125 let ret = unsafe { 126 syscall6( 127 futex, 128 futexes.get_addr(offset_futex) as usize, 129 FUTEX_WAIT, 130 futexes.get_val(offset_futex) as usize, 131 0, 132 0, 133 0, 134 ) 135 }; 136 if ret.is_err() { 137 error_handle("futex_wait failed"); 138 } 139 140 // 被唤醒后释放锁 141 let atomic_count = AtomicI32::new(lock.get_val(offset_count)); 142 if atomic_count 143 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst) 144 .is_ok() 145 { 146 lock.set_val(1, offset_count); 147 148 // 释放futex锁,不被任何线程占用 149 futexes.set_val(futexes.get_val(offset_futex) & 0xc000_0000, offset_futex); 150 151 break; 152 } 153 } 154 } 155 156 fn futex_wake(futexes: Futex, thread: &str, offset_futex: isize, lock: Lock, offset_count: isize) { 157 let atomic_count = AtomicI32::new(lock.get_val(offset_count)); 158 if atomic_count 159 .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst) 160 .is_ok() 161 { 162 lock.set_val(1, offset_count); 163 164 // 释放futex锁,不被任何线程占用 165 futexes.set_val(futexes.get_val(offset_futex) & 0xc000_0000, offset_futex); 166 167 // 如果没有线程/进程在等这个futex,则不必唤醒, 释放改锁即可 168 let futex_val = futexes.get_val(offset_futex); 169 if futex_val & 0x8000_0000 == 0 { 170 return; 171 } 172 173 futexes.set_val(futex_val & !(1 << 31), offset_futex); 174 let ret = unsafe { 175 syscall6( 176 futex, 177 futexes.get_addr(offset_futex) as usize, 178 FUTEX_WAKE, 179 1, 180 0, 181 0, 182 0, 183 ) 184 }; 185 if ret.is_err() { 186 error_handle("futex wake failed"); 187 } 188 println!("{} waked", thread); 189 } 190 } 191 192 fn set_list(futexes: Futex) { 193 let head = RobustListHead { 194 list: RobustList { next: ptr::null() }, 195 futex_offset: 44, 196 list_op_pending: ptr::null(), 197 }; 198 let head = NonNull::from(&head).as_ptr(); 199 unsafe { 200 // 加入第一个futex 201 let head_ref_mut = &mut *head; 202 head_ref_mut.list.next = futexes.get_addr(0) as *const RobustList; 203 204 // 加入第二个futex 205 let list_2 = NonNull::from(&*head_ref_mut.list.next).as_ptr(); 206 let list_2_ref_mut = &mut *list_2; 207 list_2_ref_mut.next = futexes.get_addr(1) as *const RobustList; 208 209 //println!("robust list next: {:?}", (*head).list.next ); 210 //println!("robust list next next: {:?}", (*(*head).list.next).next ); 211 212 // 向内核注册robust list 213 let len = mem::size_of::<*mut RobustListHead>(); 214 let ret = syscall2(set_robust_list, head as usize, len); 215 if ret.is_err() { 216 println!("failed to set_robust_list, ret = {:?}", ret); 217 } 218 } 219 } 220 221 fn main() { 222 test01(); 223 224 println!("-------------"); 225 226 test02(); 227 228 println!("-------------"); 229 } 230 231 //测试set_robust_list和get_robust_list两个系统调用是否能正常使用 232 fn test01() { 233 // 创建robust list 头指针 234 let head = RobustListHead { 235 list: RobustList { next: ptr::null() }, 236 futex_offset: 8, 237 list_op_pending: ptr::null(), 238 }; 239 let head = NonNull::from(&head).as_ptr(); 240 241 let futexes = unsafe { 242 mmap( 243 ptr::null_mut::<c_void>(), 244 (size_of::<c_int>() * 2) as libc::size_t, 245 PROT_READ | PROT_WRITE, 246 MAP_ANONYMOUS | MAP_SHARED, 247 -1, 248 0, 249 ) as *mut u32 250 }; 251 if futexes == MAP_FAILED as *mut u32 { 252 error_handle("futexes_addr mmap failed"); 253 } 254 255 unsafe { 256 futexes.offset(11).write(0x0000_0000); 257 futexes.offset(12).write(0x8000_0000); 258 println!("futex1 next addr: {:?}", futexes.offset(0)); 259 println!("futex2 next addr: {:?}", futexes.offset(1)); 260 println!("futex1 val addr: {:?}", futexes.offset(11)); 261 println!("futex2 val addr: {:?}", futexes.offset(12)); 262 println!("futex1 val: {:#x?}", futexes.offset(11).read()); 263 println!("futex2 val: {:#x?}", futexes.offset(12).read()); 264 } 265 266 // 打印注册之前的robust list 267 println!("robust list next(get behind): {:?}", &unsafe { *head }); 268 269 unsafe { 270 let head_ref_mut = &mut *head; 271 head_ref_mut.list.next = futexes.offset(0) as *const RobustList; 272 let list_2 = NonNull::from(&*head_ref_mut.list.next).as_ptr(); 273 let list_2_ref_mut = &mut *list_2; 274 list_2_ref_mut.next = futexes.offset(1) as *const RobustList; 275 println!("robust list next addr: {:?}", (*head).list.next); 276 println!( 277 "robust list next next addr: {:?}", 278 (*(*head).list.next).next 279 ); 280 } 281 282 unsafe { 283 let len = mem::size_of::<*mut RobustListHead>(); 284 let ret = syscall2(set_robust_list, head as usize, len); 285 if ret.is_err() { 286 println!("failed to set_robust_list, ret = {:?}", ret); 287 } 288 } 289 290 println!("get before, set after: {:?}", head); 291 println!("get before, set after: {:?}", &unsafe { *head }); 292 unsafe { 293 let len: usize = 0; 294 println!("len = {}", len); 295 let len_ptr = NonNull::from(&len).as_ptr(); 296 let ret = syscall3(get_robust_list, 0, head as usize, len_ptr as usize); 297 println!("get len = {}", len); 298 if ret.is_err() { 299 println!("failed to get_robust_list, ret = {:?}", ret); 300 } 301 302 println!("futex1 val: {:#x}", futexes.offset(11).read()); 303 println!("futex2 val: {:#x}", futexes.offset(12).read()); 304 println!("robust list next: {:?}", futexes.offset(0)); 305 println!("robust list next next: {:#x?}", futexes.offset(0).read()); 306 } 307 println!("robust list head(get after): {:?}", head); 308 println!("robust list next(get after): {:?}", &unsafe { *head }); 309 } 310 311 //测试一个线程异常退出时futex的robustness(多线程测试,目前futex还不支持多进程) 312 fn test02() { 313 let futexes = unsafe { 314 mmap( 315 ptr::null_mut::<c_void>(), 316 (size_of::<c_int>() * 2) as libc::size_t, 317 PROT_READ | PROT_WRITE, 318 MAP_ANONYMOUS | MAP_SHARED, 319 -1, 320 0, 321 ) as *mut u32 322 }; 323 if futexes == MAP_FAILED as *mut u32 { 324 error_handle("mmap failed"); 325 } 326 let count = unsafe { 327 mmap( 328 ptr::null_mut::<c_void>(), 329 (size_of::<c_int>() * 2) as libc::size_t, 330 PROT_READ | PROT_WRITE, 331 MAP_ANONYMOUS | MAP_SHARED, 332 -1, 333 0, 334 ) as *mut i32 335 }; 336 if count == MAP_FAILED as *mut i32 { 337 error_handle("mmap failed"); 338 } 339 340 unsafe { 341 // 在这个示例中,第一段和第二段地址放入robust list,第11段地址和第12段地址存放futex val 342 futexes.offset(11).write(0x0000_0000); 343 futexes.offset(12).write(0x0000_0000); 344 println!("futex1 next addr: {:?}", futexes.offset(0)); 345 println!("futex2 next addr: {:?}", futexes.offset(1)); 346 println!("futex1 val addr: {:?}", futexes.offset(11)); 347 println!("futex2 val addr: {:?}", futexes.offset(12)); 348 println!("futex1 val: {:#x?}", futexes.offset(11).read()); 349 println!("futex2 val: {:#x?}", futexes.offset(12).read()); 350 351 count.offset(0).write(1); 352 count.offset(1).write(0); 353 println!("count1 val: {:?}", count.offset(0).read()); 354 println!("count2 val: {:?}", count.offset(1).read()); 355 } 356 357 let futexes = Futex::new(futexes); 358 let locks = Lock::new(count); 359 360 // tid1 = 7 361 let thread1 = thread::spawn(move || { 362 set_list(futexes); 363 thread::sleep(Duration::from_secs(2)); 364 for i in 0..2 { 365 futex_wait(futexes, "thread1", 11, locks, 0); 366 println!("thread1 times: {}", i); 367 thread::sleep(Duration::from_secs(3)); 368 369 let tid = unsafe { syscall0(gettid).unwrap() as u32 }; 370 futexes.set_val(futexes.get_val(12) | tid, 12); 371 372 if i == 1 { 373 // 让thread1异常退出,从而无法唤醒thread2,检测robustness 374 println!("Thread1 exiting early due to simulated error."); 375 return; 376 } 377 futex_wake(futexes, "thread2", 12, locks, 1); 378 } 379 }); 380 381 // tid2 = 6 382 set_list(futexes); 383 for i in 0..2 { 384 futex_wait(futexes, "thread2", 12, locks, 1); 385 println!("thread2 times: {}", i); 386 387 let tid = unsafe { syscall0(gettid).unwrap() as u32 }; 388 futexes.set_val(futexes.get_val(11) | tid, 11); 389 390 futex_wake(futexes, "thread1", 11, locks, 0); 391 } 392 393 thread1.join().unwrap(); 394 } 395