1 use std::{ 2 fs::File, 3 mem::size_of, 4 os::unix::prelude::FileExt, 5 path::PathBuf, 6 sync::{atomic::AtomicBool, mpsc, Arc, Mutex, Weak}, 7 thread::JoinHandle, 8 }; 9 10 use klog_types::{AllocatorLog, MMLogChannel}; 11 use log::info; 12 13 use crate::backend::{ 14 loader::Symbol, 15 monitor::{logset::LogSet, ObjectWrapper}, 16 BackendData, 17 }; 18 19 #[derive(Debug)] 20 pub struct MMLogMonitor { 21 channel_symbol: Option<Symbol>, 22 shared_data: Arc<Mutex<BackendData>>, 23 /// All threads spawned by the mm log monitor. 24 threads: Mutex<Vec<JoinHandle<()>>>, 25 stop_child_threads: AtomicBool, 26 self_ref: Weak<Self>, 27 28 mm_log_receiver: Mutex<mpsc::Receiver<MMLogWorkerResult>>, 29 mm_log_sender: mpsc::Sender<MMLogWorkerResult>, 30 } 31 32 impl MMLogMonitor { 33 pub(crate) fn new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self> { 34 let guard = shared_data.lock().unwrap(); 35 let mm_log_buffer_symbol: Option<Symbol> = guard 36 .kernel_metadata 37 .as_ref() 38 .map(|km| { 39 km.sym_collection() 40 .find_by_name("__MM_ALLOCATOR_LOG_CHANNEL") 41 .map(|s| s.clone()) 42 }) 43 .flatten(); 44 drop(guard); 45 46 info!("mm_log_buffer_symbol: {:?}", mm_log_buffer_symbol); 47 48 let mm_log_worker_mpsc: ( 49 mpsc::Sender<MMLogWorkerResult>, 50 mpsc::Receiver<MMLogWorkerResult>, 51 ) = mpsc::channel::<MMLogWorkerResult>(); 52 53 let r = Self { 54 channel_symbol: mm_log_buffer_symbol, 55 shared_data, 56 threads: Mutex::new(Vec::new()), 57 stop_child_threads: AtomicBool::new(false), 58 self_ref: Weak::new(), 59 mm_log_receiver: Mutex::new(mm_log_worker_mpsc.1), 60 mm_log_sender: mm_log_worker_mpsc.0, 61 }; 62 63 let r = Arc::new(r); 64 unsafe { 65 let self_ref = Arc::downgrade(&r); 66 let r_ptr = r.as_ref() as *const Self as *mut Self; 67 (*r_ptr).self_ref = self_ref; 68 } 69 70 return r; 71 } 72 73 pub fn run(&self) { 74 info!("MMLogMonitor::run()"); 75 76 self.create_threads(); 77 78 let mut logs_set = 79 LogSet::<usize, ObjectWrapper<AllocatorLog>>::new("mm_allocator_log".to_string(), None); 80 81 self.handle_logs(&mut logs_set); 82 // std::thread::sleep(std::time::Duration::from_micros(50)); 83 } 84 85 fn handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>) { 86 let mut last_cnt = 0; 87 let mut last_time = std::time::Instant::now(); 88 let mm_log_receiver = self.mm_log_receiver.lock().unwrap(); 89 loop { 90 let logs = mm_log_receiver.recv(); 91 if logs.is_err() { 92 return; 93 } 94 95 let logs = logs.unwrap(); 96 97 for log in logs.logs { 98 logs_set.insert(log.id as usize, log); 99 } 100 101 let x = logs_set.len(); 102 // info!("logs_set.len(): {}", x); 103 let current_time = std::time::Instant::now(); 104 if current_time.duration_since(last_time).as_secs() >= 1 { 105 info!("memory log rate: {} logs/s", x - last_cnt); 106 last_cnt = x; 107 last_time = current_time; 108 } 109 } 110 } 111 112 // fn show_speed(&self, ) 113 114 fn create_threads(&self) { 115 let km = self 116 .shared_data 117 .lock() 118 .unwrap() 119 .kmem_path 120 .clone() 121 .expect("DragonOS memory map file not specified."); 122 let monitor_weak = self.self_ref.clone(); 123 124 let handle = std::thread::spawn(move || { 125 let mut monitor_thread = MMMonitorThread::new(monitor_weak, PathBuf::from(km)); 126 monitor_thread.run(); 127 }); 128 129 self.threads.lock().unwrap().push(handle); 130 } 131 } 132 133 #[derive(Debug)] 134 struct MMMonitorThread { 135 mm_log_monitor: Weak<MMLogMonitor>, 136 kmem_path: PathBuf, 137 } 138 139 impl MMMonitorThread { 140 /// Constructs a new instance of [`MMMonitorThread`]. 141 /// 142 /// ## Parameters 143 /// 144 /// - `mm_log_monitor`: The [`MMLogMonitor`] instance. 145 /// - `kmem_path`: The path to the kernel memory file. 146 pub fn new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self { 147 Self { 148 mm_log_monitor, 149 kmem_path, 150 } 151 } 152 153 pub fn run(&mut self) { 154 info!("MMMonitorThread::run(): kmem_path: {:?}", self.kmem_path); 155 156 let mut kmem_file = { 157 let file: File; 158 loop { 159 let f = self.open_kmem_file(); 160 if f.is_ok() { 161 file = f.unwrap(); 162 break; 163 } else { 164 log::error!("Failed to open kmem file, error: {:?}", f.unwrap_err()); 165 std::thread::sleep(std::time::Duration::from_secs(1)); 166 } 167 } 168 file 169 }; 170 171 info!("Channel header loaded!"); 172 173 let channel_header: ObjectWrapper<MMLogChannel<1>> = self.load_header(&mut kmem_file); 174 175 // 循环读取 176 177 self.process_logs(&mut kmem_file, &channel_header); 178 } 179 180 /// 处理内核内存分配日志 181 fn process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>) { 182 let cap = channel_header.capacity; 183 let mut buf = vec![0u8; (cap * channel_header.slot_size as u64) as usize]; 184 let symbol = self 185 .mm_log_channel_symbol() 186 .expect("Failed to get memory log channel symbol."); 187 188 let sym_offset = symbol.memory_offset(); 189 190 let slots_offset = channel_header.slots_offset + sym_offset; 191 let sender = self.mm_log_monitor.upgrade().unwrap().mm_log_sender.clone(); 192 loop { 193 if self.should_stop() { 194 break; 195 } 196 197 let r = kmem_file 198 .read_at(&mut buf, slots_offset) 199 .expect("Failed to read kmem file."); 200 assert!(r == buf.len()); 201 202 let mut logs = Vec::new(); 203 204 for chunck in buf.chunks(channel_header.slot_size as usize) { 205 let log_item = { 206 let log: Option<ObjectWrapper<AllocatorLog>> = 207 ObjectWrapper::new(&chunck[0..channel_header.element_size as usize]); 208 let log: ObjectWrapper<AllocatorLog> = log.unwrap(); 209 210 if log.is_valid() { 211 Some(log) 212 } else { 213 None 214 } 215 }; 216 if let Some(log_item) = log_item { 217 logs.push(log_item); 218 } 219 } 220 // 收集所有校验和正确的日志 221 // info!("valid_cnt: {}, invalid_cnt: {}", valid_cnt, invalid_cnt); 222 // info!("to send {} logs", logs.len()); 223 if !logs.is_empty() { 224 sender.send(MMLogWorkerResult::new(logs)).unwrap(); 225 } 226 } 227 } 228 229 fn open_kmem_file(&self) -> std::io::Result<std::fs::File> { 230 std::fs::OpenOptions::new().read(true).open(&self.kmem_path) 231 } 232 233 fn load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>> { 234 let mut buf = [0u8; size_of::<MMLogChannel<1>>()]; 235 let symbol = self 236 .mm_log_channel_symbol() 237 .expect("Failed to get memory log channel symbol."); 238 239 let sym_offset = symbol.memory_offset(); 240 241 let channel_header: Option<ObjectWrapper<MMLogChannel<1>>>; 242 243 loop { 244 let _r = kmem_file.read_at(&mut buf, sym_offset); 245 246 let header: ObjectWrapper<MMLogChannel<1>> = 247 ObjectWrapper::new(&buf).expect("Failed to parse MMLogChannel header."); 248 if header.magic == MMLogChannel::<1>::MM_LOG_CHANNEL_MAGIC { 249 info!("channel_header: {:?}", header); 250 channel_header = Some(header); 251 break; 252 } else { 253 info!("MM Log Channel not found... Maybe the kernel not started? Or the kernel version is not supported?"); 254 } 255 256 std::thread::sleep(std::time::Duration::from_secs(1)); 257 } 258 259 return channel_header.unwrap(); 260 } 261 262 /// Get the symbol of the memory log channel. 263 fn mm_log_channel_symbol(&self) -> Option<Symbol> { 264 self.mm_log_monitor 265 .upgrade() 266 .unwrap() 267 .channel_symbol 268 .clone() 269 } 270 271 /// Check if the monitor worker thread should stop. 272 fn should_stop(&self) -> bool { 273 self.mm_log_monitor 274 .upgrade() 275 .map(|mm_log_monitor| { 276 mm_log_monitor 277 .stop_child_threads 278 .load(std::sync::atomic::Ordering::Relaxed) 279 }) 280 .unwrap_or(true) 281 } 282 } 283 284 /// 内存日志监视器工作线程处理的结果 285 #[derive(Debug)] 286 struct MMLogWorkerResult { 287 logs: Vec<ObjectWrapper<AllocatorLog>>, 288 } 289 290 impl MMLogWorkerResult { 291 /// 创建一个新的内存日志监视器工作线程处理的结果 292 /// 293 /// ## 参数 294 /// 295 /// - `logs`:处理的日志 296 pub fn new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self { 297 Self { logs } 298 } 299 } 300