1 use std::{ 2 collections::{BTreeMap, VecDeque}, 3 env::Vars, 4 path::PathBuf, 5 process::{Command, Stdio}, 6 sync::{Arc, RwLock}, 7 time::SystemTime, 8 }; 9 10 use chrono::{DateTime, Utc}; 11 use dadk_config::user::UserCleanLevel; 12 use log::{debug, error, info, warn}; 13 14 use crate::{ 15 context::{Action, DadkUserExecuteContext}, 16 executor::cache::CacheDir, 17 parser::{ 18 task::{CodeSource, PrebuiltSource, TaskType}, 19 task_log::{BuildStatus, InstallStatus, TaskLog}, 20 }, 21 scheduler::{SchedEntities, SchedEntity}, 22 utils::{file::FileUtils, path::abs_path}, 23 }; 24 25 use dadk_config::common::task::TaskEnv; 26 27 use self::cache::{CacheDirType, TaskDataDir}; 28 29 pub mod cache; 30 pub mod source; 31 #[cfg(test)] 32 mod tests; 33 34 lazy_static! { 35 // 全局环境变量的列表 36 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new()); 37 } 38 39 #[derive(Debug, Clone)] 40 pub struct Executor { 41 entity: Arc<SchedEntity>, 42 action: Action, 43 local_envs: EnvMap, 44 /// 任务构建结果输出到的目录 45 build_dir: CacheDir, 46 /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径) 47 source_dir: Option<CacheDir>, 48 /// 任务数据目录 49 task_data_dir: TaskDataDir, 50 /// DragonOS sysroot的路径 51 dragonos_sysroot: PathBuf, 52 } 53 54 impl Executor { 55 /// # 创建执行器 56 /// 57 /// 用于执行一个任务 58 /// 59 /// ## 参数 60 /// 61 /// * `entity` - 任务调度实体 62 /// 63 /// ## 返回值 64 /// 65 /// * `Ok(Executor)` - 创建成功 66 /// * `Err(ExecutorError)` - 创建失败 67 pub fn new( 68 entity: Arc<SchedEntity>, 69 action: Action, 70 dragonos_sysroot: PathBuf, 71 ) -> Result<Self, ExecutorError> { 72 let local_envs = EnvMap::new(); 73 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?; 74 let task_data_dir = TaskDataDir::new(entity.clone())?; 75 76 let source_dir = if CacheDir::need_source_cache(&entity) { 77 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?) 78 } else { 79 None 80 }; 81 82 let result: Executor = Self { 83 action, 84 entity, 85 local_envs, 86 build_dir, 87 source_dir, 88 task_data_dir, 89 dragonos_sysroot, 90 }; 91 92 return Ok(result); 93 } 94 95 /// # 执行任务 96 /// 97 /// 创建执行器后,调用此方法执行任务。 98 /// 该方法会执行以下步骤: 99 /// 100 /// 1. 创建工作线程 101 /// 2. 准备环境变量 102 /// 3. 拉取数据(可选) 103 /// 4. 执行构建 104 pub fn execute(&mut self) -> Result<(), ExecutorError> { 105 info!("Execute task: {}", self.entity.task().name_version()); 106 107 let r = self.do_execute(); 108 self.save_task_data(r.clone()); 109 info!("Task {} finished", self.entity.task().name_version()); 110 return r; 111 } 112 113 /// # 保存任务数据 114 fn save_task_data(&self, r: Result<(), ExecutorError>) { 115 let mut task_log = self.task_data_dir.task_log(); 116 match self.action { 117 Action::Build => { 118 if r.is_ok() { 119 task_log.set_build_status(BuildStatus::Success); 120 } else { 121 task_log.set_build_status(BuildStatus::Failed); 122 } 123 124 task_log.set_build_time_now(); 125 } 126 127 Action::Install => { 128 if r.is_ok() { 129 task_log.set_install_status(InstallStatus::Success); 130 } else { 131 task_log.set_install_status(InstallStatus::Failed); 132 } 133 task_log.set_install_time_now(); 134 } 135 136 Action::Clean(_) => { 137 task_log.clean_build_status(); 138 task_log.clean_install_status(); 139 } 140 } 141 142 self.task_data_dir 143 .save_task_log(&task_log) 144 .expect("Failed to save task log"); 145 } 146 147 fn do_execute(&mut self) -> Result<(), ExecutorError> { 148 // 准备本地环境变量 149 self.prepare_local_env()?; 150 151 match self.action { 152 Action::Build => { 153 // 构建任务 154 self.build()?; 155 } 156 Action::Install => { 157 // 把构建结果安装到DragonOS 158 self.install()?; 159 } 160 Action::Clean(_) => { 161 // 清理构建结果 162 let r = self.clean(); 163 if let Err(e) = r { 164 error!( 165 "Failed to clean task {}: {:?}", 166 self.entity.task().name_version(), 167 e 168 ); 169 } 170 } 171 } 172 173 return Ok(()); 174 } 175 176 fn build(&mut self) -> Result<(), ExecutorError> { 177 if let Some(status) = self.task_log().build_status() { 178 if let Some(build_time) = self.task_log().build_time() { 179 let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?; 180 last_modified = core::cmp::max( 181 last_modified, 182 last_modified_time(&self.src_work_dir(), build_time)?, 183 ); 184 185 if *status == BuildStatus::Success 186 && (self.entity.task().build_once || last_modified < *build_time) 187 { 188 info!( 189 "Task {} has been built successfully, skip build.", 190 self.entity.task().name_version() 191 ); 192 return Ok(()); 193 } 194 } 195 } 196 197 return self.do_build(); 198 } 199 200 /// # 执行build操作 201 fn do_build(&mut self) -> Result<(), ExecutorError> { 202 // 确认源文件就绪 203 self.prepare_input()?; 204 205 let command: Option<Command> = self.create_command()?; 206 if let Some(cmd) = command { 207 self.run_command(cmd)?; 208 } 209 210 // 检查构建结果,如果为空,则抛出警告 211 if self.build_dir.is_empty()? { 212 warn!( 213 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?", 214 self.entity.task().name_version(), 215 ); 216 } 217 return Ok(()); 218 } 219 220 fn install(&self) -> Result<(), ExecutorError> { 221 log::trace!("dadk-user: install {}", self.entity.task().name_version()); 222 if let Some(status) = self.task_log().install_status() { 223 if let Some(install_time) = self.task_log().install_time() { 224 let last_modified = last_modified_time(&self.build_dir.path, install_time)?; 225 let last_modified = core::cmp::max( 226 last_modified, 227 last_modified_time(&self.entity.file_path(), install_time)?, 228 ); 229 230 if *status == InstallStatus::Success 231 && (self.entity.task().install_once || last_modified < *install_time) 232 { 233 info!( 234 "install: Task {} not changed.", 235 self.entity.task().name_version() 236 ); 237 return Ok(()); 238 } 239 } 240 } 241 log::trace!( 242 "dadk-user: to do install {}", 243 self.entity.task().name_version() 244 ); 245 return self.do_install(); 246 } 247 248 /// # 执行安装操作,把构建结果安装到DragonOS 249 fn do_install(&self) -> Result<(), ExecutorError> { 250 let binding = self.entity.task(); 251 let in_dragonos_path = binding.install.in_dragonos_path.as_ref(); 252 // 如果没有指定安装路径,则不执行安装 253 if in_dragonos_path.is_none() { 254 return Ok(()); 255 } 256 info!("Installing task: {}", self.entity.task().name_version()); 257 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string(); 258 259 debug!("in_dragonos_path: {}", in_dragonos_path); 260 // 去除开头的斜杠 261 { 262 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count(); 263 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string(); 264 } 265 // 拼接最终的安装路径 266 let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path)); 267 debug!("install_path: {:?}", install_path); 268 // 创建安装路径 269 std::fs::create_dir_all(&install_path).map_err(|e| { 270 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string())) 271 })?; 272 273 // 拷贝构建结果到安装路径 274 let build_dir: PathBuf = self.build_dir.path.clone(); 275 FileUtils::copy_dir_all(&build_dir, &install_path) 276 .map_err(|e| ExecutorError::InstallError(e))?; 277 info!("Task {} installed.", self.entity.task().name_version()); 278 279 return Ok(()); 280 } 281 282 fn clean(&self) -> Result<(), ExecutorError> { 283 let level = if let Action::Clean(l) = self.action { 284 l 285 } else { 286 panic!( 287 "BUG: clean() called with non-clean action. executor details: {:?}", 288 self 289 ); 290 }; 291 info!( 292 "Cleaning task: {}, level={level:?}", 293 self.entity.task().name_version() 294 ); 295 296 let r: Result<(), ExecutorError> = match level { 297 UserCleanLevel::All => self.clean_all(), 298 UserCleanLevel::InSrc => self.clean_src(), 299 UserCleanLevel::Output => { 300 self.clean_target()?; 301 self.clean_cache() 302 } 303 }; 304 305 if let Err(e) = r { 306 error!( 307 "Failed to clean task: {}, error message: {:?}", 308 self.entity.task().name_version(), 309 e 310 ); 311 return Err(e); 312 } 313 314 return Ok(()); 315 } 316 317 fn clean_all(&self) -> Result<(), ExecutorError> { 318 // 在源文件目录执行清理 319 self.clean_src()?; 320 // 清理构建结果 321 self.clean_target()?; 322 // 清理缓存 323 self.clean_cache()?; 324 return Ok(()); 325 } 326 327 /// 在源文件目录执行清理 328 fn clean_src(&self) -> Result<(), ExecutorError> { 329 let cmd: Option<Command> = self.create_command()?; 330 if cmd.is_none() { 331 // 如果这里没有命令,则认为用户不需要在源文件目录执行清理 332 return Ok(()); 333 } 334 info!( 335 "{}: Cleaning in source directory: {:?}", 336 self.entity.task().name_version(), 337 self.src_work_dir() 338 ); 339 340 let cmd = cmd.unwrap(); 341 self.run_command(cmd)?; 342 return Ok(()); 343 } 344 345 /// 清理构建输出目录 346 fn clean_target(&self) -> Result<(), ExecutorError> { 347 info!( 348 "{}: Cleaning build target directory: {:?}", 349 self.entity.task().name_version(), 350 self.build_dir.path 351 ); 352 353 return self.build_dir.remove_self_recursive(); 354 } 355 356 /// 清理下载缓存 357 fn clean_cache(&self) -> Result<(), ExecutorError> { 358 let cache_dir = self.source_dir.as_ref(); 359 if cache_dir.is_none() { 360 // 如果没有缓存目录,则认为用户不需要清理缓存 361 return Ok(()); 362 } 363 info!( 364 "{}: Cleaning cache directory: {}", 365 self.entity.task().name_version(), 366 self.src_work_dir().display() 367 ); 368 return cache_dir.unwrap().remove_self_recursive(); 369 } 370 371 /// 获取源文件的工作目录 372 fn src_work_dir(&self) -> PathBuf { 373 if let Some(local_path) = self.entity.task().source_path() { 374 return local_path; 375 } 376 return self.source_dir.as_ref().unwrap().path.clone(); 377 } 378 379 fn task_log(&self) -> TaskLog { 380 return self.task_data_dir.task_log(); 381 } 382 383 /// 为任务创建命令 384 fn create_command(&self) -> Result<Option<Command>, ExecutorError> { 385 // 获取命令 386 let raw_cmd = match self.entity.task().task_type { 387 TaskType::BuildFromSource(_) => match self.action { 388 Action::Build => self.entity.task().build.build_command.clone(), 389 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 390 _ => unimplemented!( 391 "create_command: Action {:?} not supported yet.", 392 self.action 393 ), 394 }, 395 396 TaskType::InstallFromPrebuilt(_) => match self.action { 397 Action::Build => self.entity.task().build.build_command.clone(), 398 Action::Clean(_) => self.entity.task().clean.clean_command.clone(), 399 _ => unimplemented!( 400 "create_command: Action {:?} not supported yet.", 401 self.action 402 ), 403 }, 404 }; 405 406 if raw_cmd.is_none() { 407 return Ok(None); 408 } 409 410 let raw_cmd = raw_cmd.unwrap(); 411 412 let mut command = Command::new("bash"); 413 command.current_dir(self.src_work_dir()); 414 415 // 设置参数 416 command.arg("-c"); 417 command.arg(raw_cmd); 418 419 // 设置环境变量 420 let env_list = ENV_LIST.read().unwrap(); 421 for (key, value) in env_list.envs.iter() { 422 // if key.starts_with("DADK") { 423 // debug!("DADK env found: {}={}", key, value.value); 424 // } 425 command.env(key, value.value.clone()); 426 } 427 drop(env_list); 428 for (key, value) in self.local_envs.envs.iter() { 429 debug!("Local env found: {}={}", key, value.value); 430 command.env(key, value.value.clone()); 431 } 432 433 return Ok(Some(command)); 434 } 435 436 /// # 准备工作线程本地环境变量 437 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> { 438 let binding = self.entity.task(); 439 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref(); 440 441 if let Some(task_envs) = task_envs { 442 for tv in task_envs.iter() { 443 self.local_envs 444 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string())); 445 } 446 } 447 448 // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里 449 self.local_envs.add(EnvVar::new( 450 "DADK_CURRENT_BUILD_DIR".to_string(), 451 self.build_dir.path.to_str().unwrap().to_string(), 452 )); 453 454 return Ok(()); 455 } 456 457 fn prepare_input(&self) -> Result<(), ExecutorError> { 458 // 拉取源文件 459 let task = self.entity.task(); 460 match &task.task_type { 461 TaskType::BuildFromSource(cs) => { 462 if self.source_dir.is_none() { 463 return Ok(()); 464 } 465 let source_dir = self.source_dir.as_ref().unwrap(); 466 match cs { 467 CodeSource::Git(git) => { 468 git.prepare(source_dir) 469 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 470 } 471 // 本地源文件,不需要拉取 472 CodeSource::Local(_) => return Ok(()), 473 // 在线压缩包,需要下载 474 CodeSource::Archive(archive) => { 475 archive 476 .download_unzip(source_dir) 477 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 478 } 479 } 480 } 481 TaskType::InstallFromPrebuilt(pb) => { 482 match pb { 483 // 本地源文件,不需要拉取 484 PrebuiltSource::Local(local_source) => { 485 let local_path = local_source.path(); 486 let target_path = &self.build_dir.path; 487 FileUtils::copy_dir_all(&local_path, &target_path) 488 .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string(); 489 return Ok(()); 490 } 491 // 在线压缩包,需要下载 492 PrebuiltSource::Archive(archive) => { 493 archive 494 .download_unzip(&self.build_dir) 495 .map_err(|e| ExecutorError::PrepareEnvError(e))?; 496 } 497 } 498 } 499 } 500 501 return Ok(()); 502 } 503 504 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> { 505 let mut child = command 506 .stdin(Stdio::inherit()) 507 .spawn() 508 .map_err(|e| ExecutorError::IoError(e.to_string()))?; 509 510 // 等待子进程结束 511 let r = child 512 .wait() 513 .map_err(|e| ExecutorError::IoError(e.to_string())); 514 debug!("Command finished: {:?}", r); 515 if r.is_ok() { 516 let r = r.unwrap(); 517 if r.success() { 518 return Ok(()); 519 } else { 520 // 执行失败,获取最后100行stderr输出 521 let errmsg = format!( 522 "Task {} failed, exit code = {}", 523 self.entity.task().name_version(), 524 r.code().unwrap() 525 ); 526 error!("{errmsg}"); 527 let command_opt = command.output(); 528 if command_opt.is_err() { 529 return Err(ExecutorError::TaskFailed( 530 "Failed to get command output".to_string(), 531 )); 532 } 533 let command_opt = command_opt.unwrap(); 534 let command_output = String::from_utf8_lossy(&command_opt.stderr); 535 let mut last_100_outputs = command_output 536 .lines() 537 .rev() 538 .take(100) 539 .collect::<Vec<&str>>(); 540 last_100_outputs.reverse(); 541 error!("Last 100 lines msg of stderr:"); 542 for line in last_100_outputs { 543 error!("{}", line); 544 } 545 return Err(ExecutorError::TaskFailed(errmsg)); 546 } 547 } else { 548 let errmsg = format!( 549 "Task {} failed, msg = {:?}", 550 self.entity.task().name_version(), 551 r.err().unwrap() 552 ); 553 error!("{errmsg}"); 554 return Err(ExecutorError::TaskFailed(errmsg)); 555 } 556 } 557 } 558 559 #[derive(Debug, Clone)] 560 pub struct EnvMap { 561 pub envs: BTreeMap<String, EnvVar>, 562 } 563 564 impl EnvMap { 565 pub fn new() -> Self { 566 Self { 567 envs: BTreeMap::new(), 568 } 569 } 570 571 pub fn add(&mut self, env: EnvVar) { 572 self.envs.insert(env.key.clone(), env); 573 } 574 575 #[allow(dead_code)] 576 pub fn get(&self, key: &str) -> Option<&EnvVar> { 577 self.envs.get(key) 578 } 579 580 pub fn add_vars(&mut self, vars: Vars) { 581 for (key, value) in vars { 582 self.add(EnvVar::new(key, value)); 583 } 584 } 585 } 586 587 /// # 环境变量 588 #[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)] 589 pub struct EnvVar { 590 pub key: String, 591 pub value: String, 592 } 593 594 impl EnvVar { 595 pub fn new(key: String, value: String) -> Self { 596 Self { key, value } 597 } 598 } 599 600 /// # 任务执行器错误枚举 601 #[allow(dead_code)] 602 #[derive(Debug, Clone)] 603 pub enum ExecutorError { 604 /// 准备执行环境错误 605 PrepareEnvError(String), 606 IoError(String), 607 /// 构建执行错误 608 TaskFailed(String), 609 /// 安装错误 610 InstallError(String), 611 /// 清理错误 612 CleanError(String), 613 } 614 615 /// # 准备全局环境变量 616 pub fn prepare_env( 617 sched_entities: &SchedEntities, 618 execute_ctx: &Arc<DadkUserExecuteContext>, 619 ) -> Result<(), ExecutorError> { 620 info!("Preparing environment variables..."); 621 let env_list = create_global_env_list(sched_entities, execute_ctx)?; 622 // 写入全局环境变量列表 623 let mut global_env_list = ENV_LIST.write().unwrap(); 624 *global_env_list = env_list; 625 return Ok(()); 626 } 627 628 /// # 创建全局环境变量列表 629 fn create_global_env_list( 630 sched_entities: &SchedEntities, 631 execute_ctx: &Arc<DadkUserExecuteContext>, 632 ) -> Result<EnvMap, ExecutorError> { 633 let mut env_list = EnvMap::new(); 634 let envs: Vars = std::env::vars(); 635 env_list.add_vars(envs); 636 637 // 为每个任务创建特定的环境变量 638 for entity in sched_entities.entities().iter() { 639 // 导出任务的构建目录环境变量 640 let build_dir = CacheDir::build_dir(entity.clone())?; 641 642 let build_dir_key = CacheDir::build_dir_env_key(&entity)?; 643 env_list.add(EnvVar::new( 644 build_dir_key, 645 build_dir.to_str().unwrap().to_string(), 646 )); 647 648 // 如果需要源码缓存目录,则导出 649 if CacheDir::need_source_cache(entity) { 650 let source_dir = CacheDir::source_dir(entity.clone())?; 651 let source_dir_key = CacheDir::source_dir_env_key(&entity)?; 652 env_list.add(EnvVar::new( 653 source_dir_key, 654 source_dir.to_str().unwrap().to_string(), 655 )); 656 } 657 } 658 659 // 创建ARCH环境变量 660 let target_arch = execute_ctx.target_arch(); 661 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into())); 662 663 return Ok(env_list); 664 } 665 666 /// # 获取文件最后的更新时间 667 /// 668 /// ## 参数 669 /// * `path` - 文件路径 670 /// * `last_modified` - 最后的更新时间 671 /// * `build_time` - 构建时间 672 fn last_modified_time( 673 path: &PathBuf, 674 build_time: &DateTime<Utc>, 675 ) -> Result<DateTime<Utc>, ExecutorError> { 676 let mut queue = VecDeque::new(); 677 queue.push_back(path.clone()); 678 679 let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH); 680 681 while let Some(current_path) = queue.pop_front() { 682 let metadata = current_path 683 .metadata() 684 .map_err(|e| ExecutorError::InstallError(e.to_string()))?; 685 686 if metadata.is_dir() { 687 for r in std::fs::read_dir(¤t_path).unwrap() { 688 if let Ok(entry) = r { 689 // 忽略编译产物目录 690 if entry.file_name() == "target" { 691 continue; 692 } 693 694 let entry_path = entry.path(); 695 let entry_metadata = entry.metadata().unwrap(); 696 // 比较文件的修改时间和last_modified,取最大值 697 let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap()); 698 last_modified = std::cmp::max(last_modified, file_modified); 699 700 // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续搜索 701 if last_modified > *build_time { 702 return Ok(last_modified); 703 } 704 705 if entry_metadata.is_dir() { 706 // 如果是子目录,则将其加入队列 707 queue.push_back(entry_path); 708 } 709 } 710 } 711 } else { 712 // 如果是文件,直接比较修改时间 713 let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap()); 714 last_modified = std::cmp::max(last_modified, file_modified); 715 716 // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归 717 if last_modified > *build_time { 718 return Ok(last_modified); 719 } 720 } 721 } 722 723 if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) { 724 return Err(ExecutorError::InstallError(format!( 725 "Failed to get last modified time for path: {}", 726 path.display() 727 ))); 728 } 729 Ok(last_modified) 730 } 731