xref: /DragonOS/tools/debugging/logmonitor/src/backend/monitor/mm.rs (revision 1a72a751b18cf5bbe7b5b9e91aff530de0c18501)
1 use std::{
2     fs::File,
3     mem::size_of,
4     os::unix::prelude::FileExt,
5     path::PathBuf,
6     sync::{atomic::AtomicBool, mpsc, Arc, Mutex, Weak},
7     thread::JoinHandle,
8 };
9 
10 use klog_types::{AllocatorLog, MMLogChannel};
11 use log::info;
12 
13 use crate::backend::{
14     loader::Symbol,
15     monitor::{logset::LogSet, ObjectWrapper},
16     BackendData,
17 };
18 
19 #[derive(Debug)]
20 pub struct MMLogMonitor {
21     channel_symbol: Option<Symbol>,
22     shared_data: Arc<Mutex<BackendData>>,
23     /// All threads spawned by the mm log monitor.
24     threads: Mutex<Vec<JoinHandle<()>>>,
25     stop_child_threads: AtomicBool,
26     self_ref: Weak<Self>,
27 
28     mm_log_receiver: Mutex<mpsc::Receiver<MMLogWorkerResult>>,
29     mm_log_sender: mpsc::Sender<MMLogWorkerResult>,
30 }
31 
32 impl MMLogMonitor {
33     pub fn new(shared_data: Arc<Mutex<BackendData>>) -> Arc<Self> {
34         let guard = shared_data.lock().unwrap();
35         let mm_log_buffer_symbol: Option<Symbol> = guard
36             .kernel_metadata
37             .as_ref()
38             .map(|km| {
39                 km.sym_collection()
40                     .find_by_name("__MM_ALLOCATOR_LOG_CHANNEL")
41                     .map(|s| s.clone())
42             })
43             .flatten();
44         drop(guard);
45 
46         info!("mm_log_buffer_symbol: {:?}", mm_log_buffer_symbol);
47 
48         let mm_log_worker_mpsc: (
49             mpsc::Sender<MMLogWorkerResult>,
50             mpsc::Receiver<MMLogWorkerResult>,
51         ) = mpsc::channel::<MMLogWorkerResult>();
52 
53         let r = Self {
54             channel_symbol: mm_log_buffer_symbol,
55             shared_data,
56             threads: Mutex::new(Vec::new()),
57             stop_child_threads: AtomicBool::new(false),
58             self_ref: Weak::new(),
59             mm_log_receiver: Mutex::new(mm_log_worker_mpsc.1),
60             mm_log_sender: mm_log_worker_mpsc.0,
61         };
62 
63         let r = Arc::new(r);
64         unsafe {
65             let self_ref = Arc::downgrade(&r);
66             let r_ptr = r.as_ref() as *const Self as *mut Self;
67             (*r_ptr).self_ref = self_ref;
68         }
69 
70         return r;
71     }
72 
73     pub fn run(&self) {
74         info!("MMLogMonitor::run()");
75 
76         self.create_threads();
77 
78         let mut logs_set =
79             LogSet::<usize, ObjectWrapper<AllocatorLog>>::new("mm_allocator_log".to_string(), None);
80 
81         self.handle_logs(&mut logs_set);
82         // std::thread::sleep(std::time::Duration::from_micros(50));
83     }
84 
85     fn handle_logs(&self, logs_set: &mut LogSet<usize, ObjectWrapper<AllocatorLog>>) {
86         let mut last_cnt = 0;
87         let mut last_time = std::time::Instant::now();
88         let mm_log_receiver = self.mm_log_receiver.lock().unwrap();
89         loop {
90             let logs = mm_log_receiver.recv();
91             if logs.is_err() {
92                 return;
93             }
94 
95             let logs = logs.unwrap();
96 
97             for log in logs.logs {
98                 logs_set.insert(log.id as usize, log);
99             }
100 
101             let x = logs_set.len();
102             // info!("logs_set.len(): {}", x);
103             let current_time = std::time::Instant::now();
104             if current_time.duration_since(last_time).as_secs() >= 1 {
105                 info!("memory log rate: {} logs/s", x - last_cnt);
106                 last_cnt = x;
107                 last_time = current_time;
108             }
109         }
110     }
111 
112     // fn show_speed(&self, )
113 
114     fn create_threads(&self) {
115         let km = self
116             .shared_data
117             .lock()
118             .unwrap()
119             .kmem_path
120             .clone()
121             .expect("DragonOS memory map file not specified.");
122         let monitor_weak = self.self_ref.clone();
123 
124         let handle = std::thread::spawn(move || {
125             let mut monitor_thread = MMMonitorThread::new(monitor_weak, PathBuf::from(km));
126             monitor_thread.run();
127         });
128 
129         self.threads.lock().unwrap().push(handle);
130     }
131 }
132 
133 #[derive(Debug)]
134 struct MMMonitorThread {
135     mm_log_monitor: Weak<MMLogMonitor>,
136     kmem_path: PathBuf,
137 }
138 
139 impl MMMonitorThread {
140     /// Constructs a new instance of [`MMMonitorThread`].
141     ///
142     /// ## Parameters
143     ///
144     /// - `mm_log_monitor`: The [`MMLogMonitor`] instance.
145     /// - `kmem_path`: The path to the kernel memory file.
146     pub fn new(mm_log_monitor: Weak<MMLogMonitor>, kmem_path: PathBuf) -> Self {
147         Self {
148             mm_log_monitor,
149             kmem_path,
150         }
151     }
152 
153     pub fn run(&mut self) {
154         info!("MMMonitorThread::run(): kmem_path: {:?}", self.kmem_path);
155 
156         let mut kmem_file = self.open_kmem_file().expect("Failed to open kmem file.");
157 
158         info!("Channel header loaded!");
159 
160         let channel_header: ObjectWrapper<MMLogChannel<1>> = self.load_header(&mut kmem_file);
161 
162         // 循环读取
163 
164         self.process_logs(&mut kmem_file, &channel_header);
165     }
166 
167     /// 处理内核内存分配日志
168     fn process_logs(&self, kmem_file: &mut File, channel_header: &ObjectWrapper<MMLogChannel<1>>) {
169         let cap = channel_header.capacity;
170         let mut buf = vec![0u8; (cap * channel_header.slot_size as u64) as usize];
171         let symbol = self
172             .mm_log_channel_symbol()
173             .expect("Failed to get memory log channel symbol.");
174 
175         let sym_offset = symbol.memory_offset();
176 
177         let slots_offset = channel_header.slots_offset + sym_offset;
178         let sender = self.mm_log_monitor.upgrade().unwrap().mm_log_sender.clone();
179         loop {
180             if self.should_stop() {
181                 break;
182             }
183 
184             let r = kmem_file
185                 .read_at(&mut buf, slots_offset)
186                 .expect("Failed to read kmem file.");
187             assert!(r == buf.len());
188 
189             let mut logs = Vec::new();
190 
191             for chunck in buf.chunks(channel_header.slot_size as usize) {
192                 let log_item = {
193                     let log: Option<ObjectWrapper<AllocatorLog>> =
194                         ObjectWrapper::new(&chunck[0..channel_header.element_size as usize]);
195                     let log: ObjectWrapper<AllocatorLog> = log.unwrap();
196 
197                     if log.is_valid() {
198                         Some(log)
199                     } else {
200                         None
201                     }
202                 };
203                 if let Some(log_item) = log_item {
204                     logs.push(log_item);
205                 }
206             }
207             // 收集所有校验和正确的日志
208             // info!("valid_cnt: {}, invalid_cnt: {}", valid_cnt, invalid_cnt);
209             // info!("to send {} logs", logs.len());
210             if !logs.is_empty() {
211                 sender.send(MMLogWorkerResult::new(logs)).unwrap();
212             }
213         }
214     }
215 
216     fn open_kmem_file(&self) -> std::io::Result<std::fs::File> {
217         std::fs::OpenOptions::new().read(true).open(&self.kmem_path)
218     }
219 
220     fn load_header(&self, kmem_file: &mut File) -> ObjectWrapper<MMLogChannel<1>> {
221         let mut buf = [0u8; size_of::<MMLogChannel<1>>()];
222         let symbol = self
223             .mm_log_channel_symbol()
224             .expect("Failed to get memory log channel symbol.");
225 
226         let sym_offset = symbol.memory_offset();
227 
228         let channel_header: Option<ObjectWrapper<MMLogChannel<1>>>;
229 
230         loop {
231             let _r = kmem_file.read_at(&mut buf, sym_offset);
232 
233             let header: ObjectWrapper<MMLogChannel<1>> =
234                 ObjectWrapper::new(&buf).expect("Failed to parse MMLogChannel header.");
235             if header.magic == MMLogChannel::<1>::MM_LOG_CHANNEL_MAGIC {
236                 info!("channel_header: {:?}", header);
237                 channel_header = Some(header);
238                 break;
239             } else {
240                 info!("MM Log Channel not found... Maybe the kernel not started? Or the kernel version is not supported?");
241             }
242 
243             std::thread::sleep(std::time::Duration::from_secs(1));
244         }
245 
246         return channel_header.unwrap();
247     }
248 
249     /// Get the symbol of the memory log channel.
250     fn mm_log_channel_symbol(&self) -> Option<Symbol> {
251         self.mm_log_monitor
252             .upgrade()
253             .unwrap()
254             .channel_symbol
255             .clone()
256     }
257 
258     /// Check if the monitor worker thread should stop.
259     fn should_stop(&self) -> bool {
260         self.mm_log_monitor
261             .upgrade()
262             .map(|mm_log_monitor| {
263                 mm_log_monitor
264                     .stop_child_threads
265                     .load(std::sync::atomic::Ordering::Relaxed)
266             })
267             .unwrap_or(true)
268     }
269 }
270 
271 /// 内存日志监视器工作线程处理的结果
272 #[derive(Debug)]
273 struct MMLogWorkerResult {
274     logs: Vec<ObjectWrapper<AllocatorLog>>,
275 }
276 
277 impl MMLogWorkerResult {
278     /// 创建一个新的内存日志监视器工作线程处理的结果
279     ///
280     /// ## 参数
281     ///
282     /// - `logs`:处理的日志
283     pub fn new(logs: Vec<ObjectWrapper<AllocatorLog>>) -> Self {
284         Self { logs }
285     }
286 }
287