rust使用tokio快速统计百万级别文件个数
main.rs
use std::{env, sync::{atomic::{AtomicU32, Ordering}, Arc}, time::SystemTime};
use serde_json::json;
use tokio::runtime::Builder;
pub mod tokio_file;
///获取时间戳 毫秒
pub fn get_sys_timestamp_millis() -> u128 {
let now = SystemTime::now();
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
}
fn cre_runtime() -> tokio::runtime::Runtime {
// 创建一个 Tokio 运行时,配置线程池的大小
Builder::new_multi_thread()
//.worker_threads(4) // 设置工作线程的数量
.max_blocking_threads(16) // 设置阻塞任务的最大线程数
.enable_all()
.build()
.unwrap()
}
fn main() {
// 获取命令行参数,返回一个迭代器
let args: Vec<String> = env::args().collect();
if args.len() !=2 {
println!("请输入统计的文件夹路径");
return;
}
// 第一个参数是程序的名称
let dir = args.get(1).unwrap().to_string();
// 其余参数是用户输入的参数
let runtime = cre_runtime();
let s = get_sys_timestamp_millis();
let counter = Arc::new(AtomicU32::new(0_u32));
runtime.block_on(async {
tokio_file::count_sub_file(dir, counter.clone()).await.unwrap();
});
let e = get_sys_timestamp_millis();
let current_value = counter.load(Ordering::SeqCst);
let data = json!({
"count": current_value,
"time": (e-s)
});
println!("{}", data);
}
count_sub_file函数:
/// @Author: DengLibin
/// @Date: Create in 2025-01-15 10:10:26
/// @Description: 统计文件夹下文件(夹)数量
pub async fn count_sub_file(
dir: String,
counter: Arc<AtomicU32>,
) -> Result<(), Box<dyn std::error::Error>> {
let semaphore = Arc::new(Semaphore::new(400)); // 限制并发数
//无界多生产者单消费者通道
let (tx, mut rx) = mpsc::unbounded_channel();
// 启动初始任务
tx.send(dir)?;
let active_tasks = Arc::new(AtomicUsize::new(0)); // 活跃任务计数器
// 消费队列
while let Some(path) = rx.recv().await {
if path.is_empty() {
//收到空表示结束
break;
}
let tx_clone = tx.clone();
let semaphore_clone = semaphore.clone();
let counter_clone = Arc::clone(&counter);
let active_tasks_clone = Arc::clone(&active_tasks);
//获取执行权限
let permit = semaphore_clone.acquire_owned().await?;
task::spawn(async move {
// 遍历目录
if let Ok(mut entries) = read_dir(&path).await {
while let Ok(Some(entry)) = entries.next_entry().await {
if let Ok(file_type) = entry.file_type().await {
// 计数
counter_clone.fetch_add(1, Ordering::SeqCst);
// 如果是目录,加入队列
if file_type.is_dir() {
let sub_dir = entry.path().to_string_lossy().into_owned();
let r = tx_clone.send(sub_dir.clone());
if let Err(e) = r {
println!("发送文件夹失败:{}, {}", sub_dir, e);
} else {
//发送了一个新任务
active_tasks_clone.fetch_add(1, Ordering::SeqCst);
}
}
}
}
}
//当前目录消费完成,最后一个了
if 1 == active_tasks_clone.fetch_sub(1, Ordering::SeqCst) {
let _ = tx_clone.send("".into()); //发送一个空
}
drop(permit); // 释放信号量
});
}
Ok(())
}