1*7b32f508SLoGin use std::{ 2*7b32f508SLoGin fs::File, 3*7b32f508SLoGin mem::size_of, 4*7b32f508SLoGin os::unix::prelude::FileExt, 5*7b32f508SLoGin path::PathBuf, 6*7b32f508SLoGin sync::{atomic::AtomicBool, mpsc, Arc, Mutex, Weak}, 7*7b32f508SLoGin thread::JoinHandle, 8*7b32f508SLoGin }; 9*7b32f508SLoGin 10*7b32f508SLoGin use klog_types::{AllocatorLog, MMLogChannel}; 11*7b32f508SLoGin use log::info; 12*7b32f508SLoGin 13*7b32f508SLoGin use crate::backend::{ 14*7b32f508SLoGin loader::Symbol, 15*7b32f508SLoGin monitor::{logset::LogSet, ObjectWrapper}, 16*7b32f508SLoGin BackendData, 17*7b32f508SLoGin }; 18*7b32f508SLoGin 19*7b32f508SLoGin #[derive(Debug)] 20*7b32f508SLoGin pub struct MMLogMonitor { 21*7b32f508SLoGin channel_symbol: Option<Symbol>, 22*7b32f508SLoGin shared_data: Arc<Mutex<BackendData>>, 23*7b32f508SLoGin /// All threads spawned by the mm log monitor. 24*7b32f508SLoGin threads: Mutex<Vec<JoinHandle<()>>>, 25*7b32f508SLoGin stop_child_threads: AtomicBool, 26*7b32f508SLoGin self_ref: Weak<Self>, 27*7b32f508SLoGin 28*7b32f508SLoGin mm_log_receiver: Mutex<mpsc::Receiver<MMLogWorkerResult>>, 29*7b32f508SLoGin mm_log_sender: mpsc::Sender<MMLogWorkerResult>, 30*7b32f508SLoGin } 31*7b32f508SLoGin 32*7b32f508SLoGin impl MMLogMonitor { 33*7b32f508SLoGin pub fn new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self> { 34*7b32f508SLoGin let guard = shared_data.lock().unwrap(); 35*7b32f508SLoGin let mm_log_buffer_symbol: Option<Symbol> = guard 36*7b32f508SLoGin .kernel_metadata 37*7b32f508SLoGin .as_ref() 38*7b32f508SLoGin .map(|km| { 39*7b32f508SLoGin km.sym_collection() 40*7b32f508SLoGin .find_by_name("__MM_ALLOCATOR_LOG_CHANNEL") 41*7b32f508SLoGin .map(|s| s.clone()) 42*7b32f508SLoGin }) 43*7b32f508SLoGin .flatten(); 44*7b32f508SLoGin drop(guard); 45*7b32f508SLoGin 46*7b32f508SLoGin info!("mm_log_buffer_symbol: {:?}", mm_log_buffer_symbol); 47*7b32f508SLoGin 48*7b32f508SLoGin let mm_log_worker_mpsc: ( 49*7b32f508SLoGin mpsc::Sender<MMLogWorkerResult>, 50*7b32f508SLoGin mpsc::Receiver<MMLogWorkerResult>, 51*7b32f508SLoGin ) = mpsc::channel::<MMLogWorkerResult>(); 52*7b32f508SLoGin 53*7b32f508SLoGin let r = Self { 54*7b32f508SLoGin channel_symbol: mm_log_buffer_symbol, 55*7b32f508SLoGin shared_data, 56*7b32f508SLoGin threads: Mutex::new(Vec::new()), 57*7b32f508SLoGin stop_child_threads: AtomicBool::new(false), 58*7b32f508SLoGin self_ref: Weak::new(), 59*7b32f508SLoGin mm_log_receiver: Mutex::new(mm_log_worker_mpsc.1), 60*7b32f508SLoGin mm_log_sender: mm_log_worker_mpsc.0, 61*7b32f508SLoGin }; 62*7b32f508SLoGin 63*7b32f508SLoGin let r = Arc::new(r); 64*7b32f508SLoGin unsafe { 65*7b32f508SLoGin let self_ref = Arc::downgrade(&r); 66*7b32f508SLoGin let r_ptr = r.as_ref() as *const Self as *mut Self; 67*7b32f508SLoGin (*r_ptr).self_ref = self_ref; 68*7b32f508SLoGin } 69*7b32f508SLoGin 70*7b32f508SLoGin return r; 71*7b32f508SLoGin } 72*7b32f508SLoGin 73*7b32f508SLoGin pub fn run(&self) { 74*7b32f508SLoGin info!("MMLogMonitor::run()"); 75*7b32f508SLoGin 76*7b32f508SLoGin self.create_threads(); 77*7b32f508SLoGin 78*7b32f508SLoGin let mut logs_set = 79*7b32f508SLoGin LogSet::<usize, ObjectWrapper<AllocatorLog>>::new("mm_allocator_log".to_string(), None); 80*7b32f508SLoGin 81*7b32f508SLoGin self.handle_logs(&mut logs_set); 82*7b32f508SLoGin // std::thread::sleep(std::time::Duration::from_micros(50)); 83*7b32f508SLoGin } 84*7b32f508SLoGin 85*7b32f508SLoGin fn handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>) { 86*7b32f508SLoGin let mut last_cnt = 0; 87*7b32f508SLoGin let mut last_time = std::time::Instant::now(); 88*7b32f508SLoGin let mm_log_receiver = self.mm_log_receiver.lock().unwrap(); 89*7b32f508SLoGin loop { 90*7b32f508SLoGin let logs = mm_log_receiver.recv(); 91*7b32f508SLoGin if logs.is_err() { 92*7b32f508SLoGin return; 93*7b32f508SLoGin } 94*7b32f508SLoGin 95*7b32f508SLoGin let logs = logs.unwrap(); 96*7b32f508SLoGin 97*7b32f508SLoGin for log in logs.logs { 98*7b32f508SLoGin logs_set.insert(log.id as usize, log); 99*7b32f508SLoGin } 100*7b32f508SLoGin 101*7b32f508SLoGin let x = logs_set.len(); 102*7b32f508SLoGin // info!("logs_set.len(): {}", x); 103*7b32f508SLoGin let current_time = std::time::Instant::now(); 104*7b32f508SLoGin if current_time.duration_since(last_time).as_secs() >= 1 { 105*7b32f508SLoGin info!("memory log rate: {} logs/s", x - last_cnt); 106*7b32f508SLoGin last_cnt = x; 107*7b32f508SLoGin last_time = current_time; 108*7b32f508SLoGin } 109*7b32f508SLoGin } 110*7b32f508SLoGin } 111*7b32f508SLoGin 112*7b32f508SLoGin // fn show_speed(&self, ) 113*7b32f508SLoGin 114*7b32f508SLoGin fn create_threads(&self) { 115*7b32f508SLoGin let km = self 116*7b32f508SLoGin .shared_data 117*7b32f508SLoGin .lock() 118*7b32f508SLoGin .unwrap() 119*7b32f508SLoGin .kmem_path 120*7b32f508SLoGin .clone() 121*7b32f508SLoGin .expect("DragonOS memory map file not specified."); 122*7b32f508SLoGin let monitor_weak = self.self_ref.clone(); 123*7b32f508SLoGin 124*7b32f508SLoGin let handle = std::thread::spawn(move || { 125*7b32f508SLoGin let mut monitor_thread = MMMonitorThread::new(monitor_weak, PathBuf::from(km)); 126*7b32f508SLoGin monitor_thread.run(); 127*7b32f508SLoGin }); 128*7b32f508SLoGin 129*7b32f508SLoGin self.threads.lock().unwrap().push(handle); 130*7b32f508SLoGin } 131*7b32f508SLoGin } 132*7b32f508SLoGin 133*7b32f508SLoGin #[derive(Debug)] 134*7b32f508SLoGin struct MMMonitorThread { 135*7b32f508SLoGin mm_log_monitor: Weak<MMLogMonitor>, 136*7b32f508SLoGin kmem_path: PathBuf, 137*7b32f508SLoGin } 138*7b32f508SLoGin 139*7b32f508SLoGin impl MMMonitorThread { 140*7b32f508SLoGin /// Constructs a new instance of [`MMMonitorThread`]. 141*7b32f508SLoGin /// 142*7b32f508SLoGin /// ## Parameters 143*7b32f508SLoGin /// 144*7b32f508SLoGin /// - `mm_log_monitor`: The [`MMLogMonitor`] instance. 145*7b32f508SLoGin /// - `kmem_path`: The path to the kernel memory file. 146*7b32f508SLoGin pub fn new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self { 147*7b32f508SLoGin Self { 148*7b32f508SLoGin mm_log_monitor, 149*7b32f508SLoGin kmem_path, 150*7b32f508SLoGin } 151*7b32f508SLoGin } 152*7b32f508SLoGin 153*7b32f508SLoGin pub fn run(&mut self) { 154*7b32f508SLoGin info!("MMMonitorThread::run(): kmem_path: {:?}", self.kmem_path); 155*7b32f508SLoGin 156*7b32f508SLoGin let mut kmem_file = self.open_kmem_file().expect("Failed to open kmem file."); 157*7b32f508SLoGin 158*7b32f508SLoGin info!("Channel header loaded!"); 159*7b32f508SLoGin 160*7b32f508SLoGin let channel_header: ObjectWrapper<MMLogChannel<1>> = self.load_header(&mut kmem_file); 161*7b32f508SLoGin 162*7b32f508SLoGin // 循环读取 163*7b32f508SLoGin 164*7b32f508SLoGin self.process_logs(&mut kmem_file, &channel_header); 165*7b32f508SLoGin } 166*7b32f508SLoGin 167*7b32f508SLoGin /// 处理内核内存分配日志 168*7b32f508SLoGin fn process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>) { 169*7b32f508SLoGin let cap = channel_header.capacity; 170*7b32f508SLoGin let mut buf = vec![0u8; (cap * channel_header.slot_size as u64) as usize]; 171*7b32f508SLoGin let symbol = self 172*7b32f508SLoGin .mm_log_channel_symbol() 173*7b32f508SLoGin .expect("Failed to get memory log channel symbol."); 174*7b32f508SLoGin 175*7b32f508SLoGin let sym_offset = symbol.memory_offset(); 176*7b32f508SLoGin 177*7b32f508SLoGin let slots_offset = channel_header.slots_offset + sym_offset; 178*7b32f508SLoGin let sender = self.mm_log_monitor.upgrade().unwrap().mm_log_sender.clone(); 179*7b32f508SLoGin loop { 180*7b32f508SLoGin if self.should_stop() { 181*7b32f508SLoGin break; 182*7b32f508SLoGin } 183*7b32f508SLoGin 184*7b32f508SLoGin let r = kmem_file 185*7b32f508SLoGin .read_at(&mut buf, slots_offset) 186*7b32f508SLoGin .expect("Failed to read kmem file."); 187*7b32f508SLoGin assert!(r == buf.len()); 188*7b32f508SLoGin 189*7b32f508SLoGin let mut logs = Vec::new(); 190*7b32f508SLoGin 191*7b32f508SLoGin for chunck in buf.chunks(channel_header.slot_size as usize) { 192*7b32f508SLoGin let log_item = { 193*7b32f508SLoGin let log: Option<ObjectWrapper<AllocatorLog>> = 194*7b32f508SLoGin ObjectWrapper::new(&chunck[0..channel_header.element_size as usize]); 195*7b32f508SLoGin let log: ObjectWrapper<AllocatorLog> = log.unwrap(); 196*7b32f508SLoGin 197*7b32f508SLoGin if log.is_valid() { 198*7b32f508SLoGin Some(log) 199*7b32f508SLoGin } else { 200*7b32f508SLoGin None 201*7b32f508SLoGin } 202*7b32f508SLoGin }; 203*7b32f508SLoGin if let Some(log_item) = log_item { 204*7b32f508SLoGin logs.push(log_item); 205*7b32f508SLoGin } 206*7b32f508SLoGin } 207*7b32f508SLoGin // 收集所有校验和正确的日志 208*7b32f508SLoGin // info!("valid_cnt: {}, invalid_cnt: {}", valid_cnt, invalid_cnt); 209*7b32f508SLoGin // info!("to send {} logs", logs.len()); 210*7b32f508SLoGin if !logs.is_empty() { 211*7b32f508SLoGin sender.send(MMLogWorkerResult::new(logs)).unwrap(); 212*7b32f508SLoGin } 213*7b32f508SLoGin } 214*7b32f508SLoGin } 215*7b32f508SLoGin 216*7b32f508SLoGin fn open_kmem_file(&self) -> std::io::Result<std::fs::File> { 217*7b32f508SLoGin std::fs::OpenOptions::new().read(true).open(&self.kmem_path) 218*7b32f508SLoGin } 219*7b32f508SLoGin 220*7b32f508SLoGin fn load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>> { 221*7b32f508SLoGin let mut buf = [0u8; size_of::<MMLogChannel<1>>()]; 222*7b32f508SLoGin let symbol = self 223*7b32f508SLoGin .mm_log_channel_symbol() 224*7b32f508SLoGin .expect("Failed to get memory log channel symbol."); 225*7b32f508SLoGin 226*7b32f508SLoGin let sym_offset = symbol.memory_offset(); 227*7b32f508SLoGin 228*7b32f508SLoGin let channel_header: Option<ObjectWrapper<MMLogChannel<1>>>; 229*7b32f508SLoGin 230*7b32f508SLoGin loop { 231*7b32f508SLoGin let _r = kmem_file.read_at(&mut buf, sym_offset); 232*7b32f508SLoGin 233*7b32f508SLoGin let header: ObjectWrapper<MMLogChannel<1>> = 234*7b32f508SLoGin ObjectWrapper::new(&buf).expect("Failed to parse MMLogChannel header."); 235*7b32f508SLoGin if header.magic == MMLogChannel::<1>::MM_LOG_CHANNEL_MAGIC { 236*7b32f508SLoGin info!("channel_header: {:?}", header); 237*7b32f508SLoGin channel_header = Some(header); 238*7b32f508SLoGin break; 239*7b32f508SLoGin } else { 240*7b32f508SLoGin info!("MM Log Channel not found... Maybe the kernel not started? Or the kernel version is not supported?"); 241*7b32f508SLoGin } 242*7b32f508SLoGin 243*7b32f508SLoGin std::thread::sleep(std::time::Duration::from_secs(1)); 244*7b32f508SLoGin } 245*7b32f508SLoGin 246*7b32f508SLoGin return channel_header.unwrap(); 247*7b32f508SLoGin } 248*7b32f508SLoGin 249*7b32f508SLoGin /// Get the symbol of the memory log channel. 250*7b32f508SLoGin fn mm_log_channel_symbol(&self) -> Option<Symbol> { 251*7b32f508SLoGin self.mm_log_monitor 252*7b32f508SLoGin .upgrade() 253*7b32f508SLoGin .unwrap() 254*7b32f508SLoGin .channel_symbol 255*7b32f508SLoGin .clone() 256*7b32f508SLoGin } 257*7b32f508SLoGin 258*7b32f508SLoGin /// Check if the monitor worker thread should stop. 259*7b32f508SLoGin fn should_stop(&self) -> bool { 260*7b32f508SLoGin self.mm_log_monitor 261*7b32f508SLoGin .upgrade() 262*7b32f508SLoGin .map(|mm_log_monitor| { 263*7b32f508SLoGin mm_log_monitor 264*7b32f508SLoGin .stop_child_threads 265*7b32f508SLoGin .load(std::sync::atomic::Ordering::Relaxed) 266*7b32f508SLoGin }) 267*7b32f508SLoGin .unwrap_or(true) 268*7b32f508SLoGin } 269*7b32f508SLoGin } 270*7b32f508SLoGin 271*7b32f508SLoGin /// 内存日志监视器工作线程处理的结果 272*7b32f508SLoGin #[derive(Debug)] 273*7b32f508SLoGin struct MMLogWorkerResult { 274*7b32f508SLoGin logs: Vec<ObjectWrapper<AllocatorLog>>, 275*7b32f508SLoGin } 276*7b32f508SLoGin 277*7b32f508SLoGin impl MMLogWorkerResult { 278*7b32f508SLoGin /// 创建一个新的内存日志监视器工作线程处理的结果 279*7b32f508SLoGin /// 280*7b32f508SLoGin /// ## 参数 281*7b32f508SLoGin /// 282*7b32f508SLoGin /// - `logs`:处理的日志 283*7b32f508SLoGin pub fn new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self { 284*7b32f508SLoGin Self { logs } 285*7b32f508SLoGin } 286*7b32f508SLoGin } 287