17b32f508SLoGin use std::{ 27b32f508SLoGin fs::File, 37b32f508SLoGin mem::size_of, 47b32f508SLoGin os::unix::prelude::FileExt, 57b32f508SLoGin path::PathBuf, 67b32f508SLoGin sync::{atomic::AtomicBool, mpsc, Arc, Mutex, Weak}, 77b32f508SLoGin thread::JoinHandle, 87b32f508SLoGin }; 97b32f508SLoGin 107b32f508SLoGin use klog_types::{AllocatorLog, MMLogChannel}; 117b32f508SLoGin use log::info; 127b32f508SLoGin 137b32f508SLoGin use crate::backend::{ 147b32f508SLoGin loader::Symbol, 157b32f508SLoGin monitor::{logset::LogSet, ObjectWrapper}, 167b32f508SLoGin BackendData, 177b32f508SLoGin }; 187b32f508SLoGin 197b32f508SLoGin #[derive(Debug)] 207b32f508SLoGin pub struct MMLogMonitor { 217b32f508SLoGin channel_symbol: Option<Symbol>, 227b32f508SLoGin shared_data: Arc<Mutex<BackendData>>, 237b32f508SLoGin /// All threads spawned by the mm log monitor. 247b32f508SLoGin threads: Mutex<Vec<JoinHandle<()>>>, 257b32f508SLoGin stop_child_threads: AtomicBool, 267b32f508SLoGin self_ref: Weak<Self>, 277b32f508SLoGin 287b32f508SLoGin mm_log_receiver: Mutex<mpsc::Receiver<MMLogWorkerResult>>, 297b32f508SLoGin mm_log_sender: mpsc::Sender<MMLogWorkerResult>, 307b32f508SLoGin } 317b32f508SLoGin 327b32f508SLoGin impl MMLogMonitor { new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self>33dcf232f3SLoGin pub(crate) fn new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self> { 347b32f508SLoGin let guard = shared_data.lock().unwrap(); 357b32f508SLoGin let mm_log_buffer_symbol: Option<Symbol> = guard 367b32f508SLoGin .kernel_metadata 377b32f508SLoGin .as_ref() 387b32f508SLoGin .map(|km| { 397b32f508SLoGin km.sym_collection() 407b32f508SLoGin .find_by_name("__MM_ALLOCATOR_LOG_CHANNEL") 417b32f508SLoGin .map(|s| s.clone()) 427b32f508SLoGin }) 437b32f508SLoGin .flatten(); 447b32f508SLoGin drop(guard); 457b32f508SLoGin 467b32f508SLoGin info!("mm_log_buffer_symbol: {:?}", mm_log_buffer_symbol); 477b32f508SLoGin 487b32f508SLoGin let mm_log_worker_mpsc: ( 497b32f508SLoGin mpsc::Sender<MMLogWorkerResult>, 507b32f508SLoGin mpsc::Receiver<MMLogWorkerResult>, 517b32f508SLoGin ) = mpsc::channel::<MMLogWorkerResult>(); 527b32f508SLoGin 537b32f508SLoGin let r = Self { 547b32f508SLoGin channel_symbol: mm_log_buffer_symbol, 557b32f508SLoGin shared_data, 567b32f508SLoGin threads: Mutex::new(Vec::new()), 577b32f508SLoGin stop_child_threads: AtomicBool::new(false), 587b32f508SLoGin self_ref: Weak::new(), 597b32f508SLoGin mm_log_receiver: Mutex::new(mm_log_worker_mpsc.1), 607b32f508SLoGin mm_log_sender: mm_log_worker_mpsc.0, 617b32f508SLoGin }; 627b32f508SLoGin 637b32f508SLoGin let r = Arc::new(r); 647b32f508SLoGin unsafe { 657b32f508SLoGin let self_ref = Arc::downgrade(&r); 667b32f508SLoGin let r_ptr = r.as_ref() as *const Self as *mut Self; 677b32f508SLoGin (*r_ptr).self_ref = self_ref; 687b32f508SLoGin } 697b32f508SLoGin 707b32f508SLoGin return r; 717b32f508SLoGin } 727b32f508SLoGin run(&self)737b32f508SLoGin pub fn run(&self) { 747b32f508SLoGin info!("MMLogMonitor::run()"); 757b32f508SLoGin 767b32f508SLoGin self.create_threads(); 777b32f508SLoGin 787b32f508SLoGin let mut logs_set = 797b32f508SLoGin LogSet::<usize, ObjectWrapper<AllocatorLog>>::new("mm_allocator_log".to_string(), None); 807b32f508SLoGin 817b32f508SLoGin self.handle_logs(&mut logs_set); 827b32f508SLoGin // std::thread::sleep(std::time::Duration::from_micros(50)); 837b32f508SLoGin } 847b32f508SLoGin handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>)857b32f508SLoGin fn handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>) { 867b32f508SLoGin let mut last_cnt = 0; 877b32f508SLoGin let mut last_time = std::time::Instant::now(); 887b32f508SLoGin let mm_log_receiver = self.mm_log_receiver.lock().unwrap(); 897b32f508SLoGin loop { 907b32f508SLoGin let logs = mm_log_receiver.recv(); 917b32f508SLoGin if logs.is_err() { 927b32f508SLoGin return; 937b32f508SLoGin } 947b32f508SLoGin 957b32f508SLoGin let logs = logs.unwrap(); 967b32f508SLoGin 977b32f508SLoGin for log in logs.logs { 987b32f508SLoGin logs_set.insert(log.id as usize, log); 997b32f508SLoGin } 1007b32f508SLoGin 1017b32f508SLoGin let x = logs_set.len(); 1027b32f508SLoGin // info!("logs_set.len(): {}", x); 1037b32f508SLoGin let current_time = std::time::Instant::now(); 1047b32f508SLoGin if current_time.duration_since(last_time).as_secs() >= 1 { 1057b32f508SLoGin info!("memory log rate: {} logs/s", x - last_cnt); 1067b32f508SLoGin last_cnt = x; 1077b32f508SLoGin last_time = current_time; 1087b32f508SLoGin } 1097b32f508SLoGin } 1107b32f508SLoGin } 1117b32f508SLoGin 1127b32f508SLoGin // fn show_speed(&self, ) 1137b32f508SLoGin create_threads(&self)1147b32f508SLoGin fn create_threads(&self) { 1157b32f508SLoGin let km = self 1167b32f508SLoGin .shared_data 1177b32f508SLoGin .lock() 1187b32f508SLoGin .unwrap() 1197b32f508SLoGin .kmem_path 1207b32f508SLoGin .clone() 1217b32f508SLoGin .expect("DragonOS memory map file not specified."); 1227b32f508SLoGin let monitor_weak = self.self_ref.clone(); 1237b32f508SLoGin 1247b32f508SLoGin let handle = std::thread::spawn(move || { 1257b32f508SLoGin let mut monitor_thread = MMMonitorThread::new(monitor_weak, PathBuf::from(km)); 1267b32f508SLoGin monitor_thread.run(); 1277b32f508SLoGin }); 1287b32f508SLoGin 1297b32f508SLoGin self.threads.lock().unwrap().push(handle); 1307b32f508SLoGin } 1317b32f508SLoGin } 1327b32f508SLoGin 1337b32f508SLoGin #[derive(Debug)] 1347b32f508SLoGin struct MMMonitorThread { 1357b32f508SLoGin mm_log_monitor: Weak<MMLogMonitor>, 1367b32f508SLoGin kmem_path: PathBuf, 1377b32f508SLoGin } 1387b32f508SLoGin 1397b32f508SLoGin impl MMMonitorThread { 1407b32f508SLoGin /// Constructs a new instance of [`MMMonitorThread`]. 1417b32f508SLoGin /// 1427b32f508SLoGin /// ## Parameters 1437b32f508SLoGin /// 1447b32f508SLoGin /// - `mm_log_monitor`: The [`MMLogMonitor`] instance. 1457b32f508SLoGin /// - `kmem_path`: The path to the kernel memory file. new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self1467b32f508SLoGin pub fn new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self { 1477b32f508SLoGin Self { 1487b32f508SLoGin mm_log_monitor, 1497b32f508SLoGin kmem_path, 1507b32f508SLoGin } 1517b32f508SLoGin } 1527b32f508SLoGin run(&mut self)1537b32f508SLoGin pub fn run(&mut self) { 1547b32f508SLoGin info!("MMMonitorThread::run(): kmem_path: {:?}", self.kmem_path); 1557b32f508SLoGin 156dcf232f3SLoGin let mut kmem_file = { 157*816ee5aeSLoGin let file: File; 158dcf232f3SLoGin loop { 159dcf232f3SLoGin let f = self.open_kmem_file(); 160dcf232f3SLoGin if f.is_ok() { 161dcf232f3SLoGin file = f.unwrap(); 162dcf232f3SLoGin break; 163dcf232f3SLoGin } else { 164dcf232f3SLoGin log::error!("Failed to open kmem file, error: {:?}", f.unwrap_err()); 165dcf232f3SLoGin std::thread::sleep(std::time::Duration::from_secs(1)); 166dcf232f3SLoGin } 167dcf232f3SLoGin } 168dcf232f3SLoGin file 169dcf232f3SLoGin }; 1707b32f508SLoGin 1717b32f508SLoGin info!("Channel header loaded!"); 1727b32f508SLoGin 1737b32f508SLoGin let channel_header: ObjectWrapper<MMLogChannel<1>> = self.load_header(&mut kmem_file); 1747b32f508SLoGin 1757b32f508SLoGin // 循环读取 1767b32f508SLoGin 1777b32f508SLoGin self.process_logs(&mut kmem_file, &channel_header); 1787b32f508SLoGin } 1797b32f508SLoGin 1807b32f508SLoGin /// 处理内核内存分配日志 process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>)1817b32f508SLoGin fn process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>) { 1827b32f508SLoGin let cap = channel_header.capacity; 1837b32f508SLoGin let mut buf = vec![0u8; (cap * channel_header.slot_size as u64) as usize]; 1847b32f508SLoGin let symbol = self 1857b32f508SLoGin .mm_log_channel_symbol() 1867b32f508SLoGin .expect("Failed to get memory log channel symbol."); 1877b32f508SLoGin 1887b32f508SLoGin let sym_offset = symbol.memory_offset(); 1897b32f508SLoGin 1907b32f508SLoGin let slots_offset = channel_header.slots_offset + sym_offset; 1917b32f508SLoGin let sender = self.mm_log_monitor.upgrade().unwrap().mm_log_sender.clone(); 1927b32f508SLoGin loop { 1937b32f508SLoGin if self.should_stop() { 1947b32f508SLoGin break; 1957b32f508SLoGin } 1967b32f508SLoGin 1977b32f508SLoGin let r = kmem_file 1987b32f508SLoGin .read_at(&mut buf, slots_offset) 1997b32f508SLoGin .expect("Failed to read kmem file."); 2007b32f508SLoGin assert!(r == buf.len()); 2017b32f508SLoGin 2027b32f508SLoGin let mut logs = Vec::new(); 2037b32f508SLoGin 2047b32f508SLoGin for chunck in buf.chunks(channel_header.slot_size as usize) { 2057b32f508SLoGin let log_item = { 2067b32f508SLoGin let log: Option<ObjectWrapper<AllocatorLog>> = 2077b32f508SLoGin ObjectWrapper::new(&chunck[0..channel_header.element_size as usize]); 2087b32f508SLoGin let log: ObjectWrapper<AllocatorLog> = log.unwrap(); 2097b32f508SLoGin 2107b32f508SLoGin if log.is_valid() { 2117b32f508SLoGin Some(log) 2127b32f508SLoGin } else { 2137b32f508SLoGin None 2147b32f508SLoGin } 2157b32f508SLoGin }; 2167b32f508SLoGin if let Some(log_item) = log_item { 2177b32f508SLoGin logs.push(log_item); 2187b32f508SLoGin } 2197b32f508SLoGin } 2207b32f508SLoGin // 收集所有校验和正确的日志 2217b32f508SLoGin // info!("valid_cnt: {}, invalid_cnt: {}", valid_cnt, invalid_cnt); 2227b32f508SLoGin // info!("to send {} logs", logs.len()); 2237b32f508SLoGin if !logs.is_empty() { 2247b32f508SLoGin sender.send(MMLogWorkerResult::new(logs)).unwrap(); 2257b32f508SLoGin } 2267b32f508SLoGin } 2277b32f508SLoGin } 2287b32f508SLoGin open_kmem_file(&self) -> std::io::Result<std::fs::File>2297b32f508SLoGin fn open_kmem_file(&self) -> std::io::Result<std::fs::File> { 2307b32f508SLoGin std::fs::OpenOptions::new().read(true).open(&self.kmem_path) 2317b32f508SLoGin } 2327b32f508SLoGin load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>>2337b32f508SLoGin fn load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>> { 2347b32f508SLoGin let mut buf = [0u8; size_of::<MMLogChannel<1>>()]; 2357b32f508SLoGin let symbol = self 2367b32f508SLoGin .mm_log_channel_symbol() 2377b32f508SLoGin .expect("Failed to get memory log channel symbol."); 2387b32f508SLoGin 2397b32f508SLoGin let sym_offset = symbol.memory_offset(); 2407b32f508SLoGin 2417b32f508SLoGin let channel_header: Option<ObjectWrapper<MMLogChannel<1>>>; 2427b32f508SLoGin 2437b32f508SLoGin loop { 2447b32f508SLoGin let _r = kmem_file.read_at(&mut buf, sym_offset); 2457b32f508SLoGin 2467b32f508SLoGin let header: ObjectWrapper<MMLogChannel<1>> = 2477b32f508SLoGin ObjectWrapper::new(&buf).expect("Failed to parse MMLogChannel header."); 2487b32f508SLoGin if header.magic == MMLogChannel::<1>::MM_LOG_CHANNEL_MAGIC { 2497b32f508SLoGin info!("channel_header: {:?}", header); 2507b32f508SLoGin channel_header = Some(header); 2517b32f508SLoGin break; 2527b32f508SLoGin } else { 2537b32f508SLoGin info!("MM Log Channel not found... Maybe the kernel not started? Or the kernel version is not supported?"); 2547b32f508SLoGin } 2557b32f508SLoGin 2567b32f508SLoGin std::thread::sleep(std::time::Duration::from_secs(1)); 2577b32f508SLoGin } 2587b32f508SLoGin 2597b32f508SLoGin return channel_header.unwrap(); 2607b32f508SLoGin } 2617b32f508SLoGin 2627b32f508SLoGin /// Get the symbol of the memory log channel. mm_log_channel_symbol(&self) -> Option<Symbol>2637b32f508SLoGin fn mm_log_channel_symbol(&self) -> Option<Symbol> { 2647b32f508SLoGin self.mm_log_monitor 2657b32f508SLoGin .upgrade() 2667b32f508SLoGin .unwrap() 2677b32f508SLoGin .channel_symbol 2687b32f508SLoGin .clone() 2697b32f508SLoGin } 2707b32f508SLoGin 2717b32f508SLoGin /// Check if the monitor worker thread should stop. should_stop(&self) -> bool2727b32f508SLoGin fn should_stop(&self) -> bool { 2737b32f508SLoGin self.mm_log_monitor 2747b32f508SLoGin .upgrade() 2757b32f508SLoGin .map(|mm_log_monitor| { 2767b32f508SLoGin mm_log_monitor 2777b32f508SLoGin .stop_child_threads 2787b32f508SLoGin .load(std::sync::atomic::Ordering::Relaxed) 2797b32f508SLoGin }) 2807b32f508SLoGin .unwrap_or(true) 2817b32f508SLoGin } 2827b32f508SLoGin } 2837b32f508SLoGin 2847b32f508SLoGin /// 内存日志监视器工作线程处理的结果 2857b32f508SLoGin #[derive(Debug)] 2867b32f508SLoGin struct MMLogWorkerResult { 2877b32f508SLoGin logs: Vec<ObjectWrapper<AllocatorLog>>, 2887b32f508SLoGin } 2897b32f508SLoGin 2907b32f508SLoGin impl MMLogWorkerResult { 2917b32f508SLoGin /// 创建一个新的内存日志监视器工作线程处理的结果 2927b32f508SLoGin /// 2937b32f508SLoGin /// ## 参数 2947b32f508SLoGin /// 2957b32f508SLoGin /// - `logs`:处理的日志 new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self2967b32f508SLoGin pub fn new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self { 2977b32f508SLoGin Self { logs } 2987b32f508SLoGin } 2997b32f508SLoGin } 300