xref: /DragonOS/tools/debugging/logmonitor/src/backend/monitor/mm.rs (revision 7b32f5080f42bcbf7d2421013f3ea53c776a063c)
1*7b32f508SLoGin use std::{
2*7b32f508SLoGin     fs::File,
3*7b32f508SLoGin     mem::size_of,
4*7b32f508SLoGin     os::unix::prelude::FileExt,
5*7b32f508SLoGin     path::PathBuf,
6*7b32f508SLoGin     sync::{atomic::AtomicBool, mpsc, Arc, Mutex, Weak},
7*7b32f508SLoGin     thread::JoinHandle,
8*7b32f508SLoGin };
9*7b32f508SLoGin 
10*7b32f508SLoGin use klog_types::{AllocatorLog, MMLogChannel};
11*7b32f508SLoGin use log::info;
12*7b32f508SLoGin 
13*7b32f508SLoGin use crate::backend::{
14*7b32f508SLoGin     loader::Symbol,
15*7b32f508SLoGin     monitor::{logset::LogSet, ObjectWrapper},
16*7b32f508SLoGin     BackendData,
17*7b32f508SLoGin };
18*7b32f508SLoGin 
19*7b32f508SLoGin #[derive(Debug)]
20*7b32f508SLoGin pub struct MMLogMonitor {
21*7b32f508SLoGin     channel_symbol: Option<Symbol>,
22*7b32f508SLoGin     shared_data: Arc<Mutex<BackendData>>,
23*7b32f508SLoGin     /// All threads spawned by the mm log monitor.
24*7b32f508SLoGin     threads: Mutex<Vec<JoinHandle<()>>>,
25*7b32f508SLoGin     stop_child_threads: AtomicBool,
26*7b32f508SLoGin     self_ref: Weak<Self>,
27*7b32f508SLoGin 
28*7b32f508SLoGin     mm_log_receiver: Mutex<mpsc::Receiver<MMLogWorkerResult>>,
29*7b32f508SLoGin     mm_log_sender: mpsc::Sender<MMLogWorkerResult>,
30*7b32f508SLoGin }
31*7b32f508SLoGin 
32*7b32f508SLoGin impl MMLogMonitor {
33*7b32f508SLoGin     pub fn new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self> {
34*7b32f508SLoGin         let guard = shared_data.lock().unwrap();
35*7b32f508SLoGin         let mm_log_buffer_symbol: Option<Symbol> = guard
36*7b32f508SLoGin             .kernel_metadata
37*7b32f508SLoGin             .as_ref()
38*7b32f508SLoGin             .map(|km| {
39*7b32f508SLoGin                 km.sym_collection()
40*7b32f508SLoGin                     .find_by_name("__MM_ALLOCATOR_LOG_CHANNEL")
41*7b32f508SLoGin                     .map(|s| s.clone())
42*7b32f508SLoGin             })
43*7b32f508SLoGin             .flatten();
44*7b32f508SLoGin         drop(guard);
45*7b32f508SLoGin 
46*7b32f508SLoGin         info!("mm_log_buffer_symbol: {:?}", mm_log_buffer_symbol);
47*7b32f508SLoGin 
48*7b32f508SLoGin         let mm_log_worker_mpsc: (
49*7b32f508SLoGin             mpsc::Sender<MMLogWorkerResult>,
50*7b32f508SLoGin             mpsc::Receiver<MMLogWorkerResult>,
51*7b32f508SLoGin         ) = mpsc::channel::<MMLogWorkerResult>();
52*7b32f508SLoGin 
53*7b32f508SLoGin         let r = Self {
54*7b32f508SLoGin             channel_symbol: mm_log_buffer_symbol,
55*7b32f508SLoGin             shared_data,
56*7b32f508SLoGin             threads: Mutex::new(Vec::new()),
57*7b32f508SLoGin             stop_child_threads: AtomicBool::new(false),
58*7b32f508SLoGin             self_ref: Weak::new(),
59*7b32f508SLoGin             mm_log_receiver: Mutex::new(mm_log_worker_mpsc.1),
60*7b32f508SLoGin             mm_log_sender: mm_log_worker_mpsc.0,
61*7b32f508SLoGin         };
62*7b32f508SLoGin 
63*7b32f508SLoGin         let r = Arc::new(r);
64*7b32f508SLoGin         unsafe {
65*7b32f508SLoGin             let self_ref = Arc::downgrade(&r);
66*7b32f508SLoGin             let r_ptr = r.as_ref() as *const Self as *mut Self;
67*7b32f508SLoGin             (*r_ptr).self_ref = self_ref;
68*7b32f508SLoGin         }
69*7b32f508SLoGin 
70*7b32f508SLoGin         return r;
71*7b32f508SLoGin     }
72*7b32f508SLoGin 
73*7b32f508SLoGin     pub fn run(&self) {
74*7b32f508SLoGin         info!("MMLogMonitor::run()");
75*7b32f508SLoGin 
76*7b32f508SLoGin         self.create_threads();
77*7b32f508SLoGin 
78*7b32f508SLoGin         let mut logs_set =
79*7b32f508SLoGin             LogSet::<usize, ObjectWrapper<AllocatorLog>>::new("mm_allocator_log".to_string(), None);
80*7b32f508SLoGin 
81*7b32f508SLoGin         self.handle_logs(&mut logs_set);
82*7b32f508SLoGin         // std::thread::sleep(std::time::Duration::from_micros(50));
83*7b32f508SLoGin     }
84*7b32f508SLoGin 
85*7b32f508SLoGin     fn handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>) {
86*7b32f508SLoGin         let mut last_cnt = 0;
87*7b32f508SLoGin         let mut last_time = std::time::Instant::now();
88*7b32f508SLoGin         let mm_log_receiver = self.mm_log_receiver.lock().unwrap();
89*7b32f508SLoGin         loop {
90*7b32f508SLoGin             let logs = mm_log_receiver.recv();
91*7b32f508SLoGin             if logs.is_err() {
92*7b32f508SLoGin                 return;
93*7b32f508SLoGin             }
94*7b32f508SLoGin 
95*7b32f508SLoGin             let logs = logs.unwrap();
96*7b32f508SLoGin 
97*7b32f508SLoGin             for log in logs.logs {
98*7b32f508SLoGin                 logs_set.insert(log.id as usize, log);
99*7b32f508SLoGin             }
100*7b32f508SLoGin 
101*7b32f508SLoGin             let x = logs_set.len();
102*7b32f508SLoGin             // info!("logs_set.len(): {}", x);
103*7b32f508SLoGin             let current_time = std::time::Instant::now();
104*7b32f508SLoGin             if current_time.duration_since(last_time).as_secs() >= 1 {
105*7b32f508SLoGin                 info!("memory log rate: {} logs/s", x - last_cnt);
106*7b32f508SLoGin                 last_cnt = x;
107*7b32f508SLoGin                 last_time = current_time;
108*7b32f508SLoGin             }
109*7b32f508SLoGin         }
110*7b32f508SLoGin     }
111*7b32f508SLoGin 
112*7b32f508SLoGin     // fn show_speed(&self, )
113*7b32f508SLoGin 
114*7b32f508SLoGin     fn create_threads(&self) {
115*7b32f508SLoGin         let km = self
116*7b32f508SLoGin             .shared_data
117*7b32f508SLoGin             .lock()
118*7b32f508SLoGin             .unwrap()
119*7b32f508SLoGin             .kmem_path
120*7b32f508SLoGin             .clone()
121*7b32f508SLoGin             .expect("DragonOS memory map file not specified.");
122*7b32f508SLoGin         let monitor_weak = self.self_ref.clone();
123*7b32f508SLoGin 
124*7b32f508SLoGin         let handle = std::thread::spawn(move || {
125*7b32f508SLoGin             let mut monitor_thread = MMMonitorThread::new(monitor_weak, PathBuf::from(km));
126*7b32f508SLoGin             monitor_thread.run();
127*7b32f508SLoGin         });
128*7b32f508SLoGin 
129*7b32f508SLoGin         self.threads.lock().unwrap().push(handle);
130*7b32f508SLoGin     }
131*7b32f508SLoGin }
132*7b32f508SLoGin 
133*7b32f508SLoGin #[derive(Debug)]
134*7b32f508SLoGin struct MMMonitorThread {
135*7b32f508SLoGin     mm_log_monitor: Weak<MMLogMonitor>,
136*7b32f508SLoGin     kmem_path: PathBuf,
137*7b32f508SLoGin }
138*7b32f508SLoGin 
139*7b32f508SLoGin impl MMMonitorThread {
140*7b32f508SLoGin     /// Constructs a new instance of [`MMMonitorThread`].
141*7b32f508SLoGin     ///
142*7b32f508SLoGin     /// ## Parameters
143*7b32f508SLoGin     ///
144*7b32f508SLoGin     /// - `mm_log_monitor`: The [`MMLogMonitor`] instance.
145*7b32f508SLoGin     /// - `kmem_path`: The path to the kernel memory file.
146*7b32f508SLoGin     pub fn new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self {
147*7b32f508SLoGin         Self {
148*7b32f508SLoGin             mm_log_monitor,
149*7b32f508SLoGin             kmem_path,
150*7b32f508SLoGin         }
151*7b32f508SLoGin     }
152*7b32f508SLoGin 
153*7b32f508SLoGin     pub fn run(&mut self) {
154*7b32f508SLoGin         info!("MMMonitorThread::run(): kmem_path: {:?}", self.kmem_path);
155*7b32f508SLoGin 
156*7b32f508SLoGin         let mut kmem_file = self.open_kmem_file().expect("Failed to open kmem file.");
157*7b32f508SLoGin 
158*7b32f508SLoGin         info!("Channel header loaded!");
159*7b32f508SLoGin 
160*7b32f508SLoGin         let channel_header: ObjectWrapper<MMLogChannel<1>> = self.load_header(&mut kmem_file);
161*7b32f508SLoGin 
162*7b32f508SLoGin         // 循环读取
163*7b32f508SLoGin 
164*7b32f508SLoGin         self.process_logs(&mut kmem_file, &channel_header);
165*7b32f508SLoGin     }
166*7b32f508SLoGin 
167*7b32f508SLoGin     /// 处理内核内存分配日志
168*7b32f508SLoGin     fn process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>) {
169*7b32f508SLoGin         let cap = channel_header.capacity;
170*7b32f508SLoGin         let mut buf = vec![0u8; (cap * channel_header.slot_size as u64) as usize];
171*7b32f508SLoGin         let symbol = self
172*7b32f508SLoGin             .mm_log_channel_symbol()
173*7b32f508SLoGin             .expect("Failed to get memory log channel symbol.");
174*7b32f508SLoGin 
175*7b32f508SLoGin         let sym_offset = symbol.memory_offset();
176*7b32f508SLoGin 
177*7b32f508SLoGin         let slots_offset = channel_header.slots_offset + sym_offset;
178*7b32f508SLoGin         let sender = self.mm_log_monitor.upgrade().unwrap().mm_log_sender.clone();
179*7b32f508SLoGin         loop {
180*7b32f508SLoGin             if self.should_stop() {
181*7b32f508SLoGin                 break;
182*7b32f508SLoGin             }
183*7b32f508SLoGin 
184*7b32f508SLoGin             let r = kmem_file
185*7b32f508SLoGin                 .read_at(&mut buf, slots_offset)
186*7b32f508SLoGin                 .expect("Failed to read kmem file.");
187*7b32f508SLoGin             assert!(r == buf.len());
188*7b32f508SLoGin 
189*7b32f508SLoGin             let mut logs = Vec::new();
190*7b32f508SLoGin 
191*7b32f508SLoGin             for chunck in buf.chunks(channel_header.slot_size as usize) {
192*7b32f508SLoGin                 let log_item = {
193*7b32f508SLoGin                     let log: Option<ObjectWrapper<AllocatorLog>> =
194*7b32f508SLoGin                         ObjectWrapper::new(&chunck[0..channel_header.element_size as usize]);
195*7b32f508SLoGin                     let log: ObjectWrapper<AllocatorLog> = log.unwrap();
196*7b32f508SLoGin 
197*7b32f508SLoGin                     if log.is_valid() {
198*7b32f508SLoGin                         Some(log)
199*7b32f508SLoGin                     } else {
200*7b32f508SLoGin                         None
201*7b32f508SLoGin                     }
202*7b32f508SLoGin                 };
203*7b32f508SLoGin                 if let Some(log_item) = log_item {
204*7b32f508SLoGin                     logs.push(log_item);
205*7b32f508SLoGin                 }
206*7b32f508SLoGin             }
207*7b32f508SLoGin             // 收集所有校验和正确的日志
208*7b32f508SLoGin             // info!("valid_cnt: {}, invalid_cnt: {}", valid_cnt, invalid_cnt);
209*7b32f508SLoGin             // info!("to send {} logs", logs.len());
210*7b32f508SLoGin             if !logs.is_empty() {
211*7b32f508SLoGin                 sender.send(MMLogWorkerResult::new(logs)).unwrap();
212*7b32f508SLoGin             }
213*7b32f508SLoGin         }
214*7b32f508SLoGin     }
215*7b32f508SLoGin 
216*7b32f508SLoGin     fn open_kmem_file(&self) -> std::io::Result<std::fs::File> {
217*7b32f508SLoGin         std::fs::OpenOptions::new().read(true).open(&self.kmem_path)
218*7b32f508SLoGin     }
219*7b32f508SLoGin 
220*7b32f508SLoGin     fn load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>> {
221*7b32f508SLoGin         let mut buf = [0u8; size_of::<MMLogChannel<1>>()];
222*7b32f508SLoGin         let symbol = self
223*7b32f508SLoGin             .mm_log_channel_symbol()
224*7b32f508SLoGin             .expect("Failed to get memory log channel symbol.");
225*7b32f508SLoGin 
226*7b32f508SLoGin         let sym_offset = symbol.memory_offset();
227*7b32f508SLoGin 
228*7b32f508SLoGin         let channel_header: Option<ObjectWrapper<MMLogChannel<1>>>;
229*7b32f508SLoGin 
230*7b32f508SLoGin         loop {
231*7b32f508SLoGin             let _r = kmem_file.read_at(&mut buf, sym_offset);
232*7b32f508SLoGin 
233*7b32f508SLoGin             let header: ObjectWrapper<MMLogChannel<1>> =
234*7b32f508SLoGin                 ObjectWrapper::new(&buf).expect("Failed to parse MMLogChannel header.");
235*7b32f508SLoGin             if header.magic == MMLogChannel::<1>::MM_LOG_CHANNEL_MAGIC {
236*7b32f508SLoGin                 info!("channel_header: {:?}", header);
237*7b32f508SLoGin                 channel_header = Some(header);
238*7b32f508SLoGin                 break;
239*7b32f508SLoGin             } else {
240*7b32f508SLoGin                 info!("MM Log Channel not found... Maybe the kernel not started? Or the kernel version is not supported?");
241*7b32f508SLoGin             }
242*7b32f508SLoGin 
243*7b32f508SLoGin             std::thread::sleep(std::time::Duration::from_secs(1));
244*7b32f508SLoGin         }
245*7b32f508SLoGin 
246*7b32f508SLoGin         return channel_header.unwrap();
247*7b32f508SLoGin     }
248*7b32f508SLoGin 
249*7b32f508SLoGin     /// Get the symbol of the memory log channel.
250*7b32f508SLoGin     fn mm_log_channel_symbol(&self) -> Option<Symbol> {
251*7b32f508SLoGin         self.mm_log_monitor
252*7b32f508SLoGin             .upgrade()
253*7b32f508SLoGin             .unwrap()
254*7b32f508SLoGin             .channel_symbol
255*7b32f508SLoGin             .clone()
256*7b32f508SLoGin     }
257*7b32f508SLoGin 
258*7b32f508SLoGin     /// Check if the monitor worker thread should stop.
259*7b32f508SLoGin     fn should_stop(&self) -> bool {
260*7b32f508SLoGin         self.mm_log_monitor
261*7b32f508SLoGin             .upgrade()
262*7b32f508SLoGin             .map(|mm_log_monitor| {
263*7b32f508SLoGin                 mm_log_monitor
264*7b32f508SLoGin                     .stop_child_threads
265*7b32f508SLoGin                     .load(std::sync::atomic::Ordering::Relaxed)
266*7b32f508SLoGin             })
267*7b32f508SLoGin             .unwrap_or(true)
268*7b32f508SLoGin     }
269*7b32f508SLoGin }
270*7b32f508SLoGin 
271*7b32f508SLoGin /// 内存日志监视器工作线程处理的结果
272*7b32f508SLoGin #[derive(Debug)]
273*7b32f508SLoGin struct MMLogWorkerResult {
274*7b32f508SLoGin     logs: Vec<ObjectWrapper<AllocatorLog>>,
275*7b32f508SLoGin }
276*7b32f508SLoGin 
277*7b32f508SLoGin impl MMLogWorkerResult {
278*7b32f508SLoGin     /// 创建一个新的内存日志监视器工作线程处理的结果
279*7b32f508SLoGin     ///
280*7b32f508SLoGin     /// ## 参数
281*7b32f508SLoGin     ///
282*7b32f508SLoGin     /// - `logs`:处理的日志
283*7b32f508SLoGin     pub fn new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self {
284*7b32f508SLoGin         Self { logs }
285*7b32f508SLoGin     }
286*7b32f508SLoGin }
287