1
0
Fork 0
mirror of synced 2024-05-17 19:03:08 +12:00

Progress handler

This commit is contained in:
Rafał Mikrut 2023-05-01 20:37:58 +02:00
parent 34ea3bc96e
commit e16c28a2e9
5 changed files with 122 additions and 152 deletions

View file

@ -5,15 +5,15 @@ use std::io::BufWriter;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime};
use std::{mem, thread};
use std::mem;
use std::time::SystemTime;
use crossbeam_channel::Receiver;
use mime_guess::get_mime_extensions;
use rayon::prelude::*;
use crate::common::{Common, LOOP_DURATION};
use crate::common::{prepare_thread_handler_common, Common};
use crate::common_dir_traversal::{CheckingMethod, DirTraversalBuilder, DirTraversalResult, FileEntry, ProgressData};
use crate::common_directory::Directories;
use crate::common_extensions::Extensions;
@ -317,39 +317,6 @@ impl BadExtensions {
}
}
fn prepare_bad_extension_thread_handler(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: Arc<AtomicBool>,
atomic_counter: Arc<AtomicUsize>,
current_stage: u8,
max_stage: u8,
max_value: usize,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run;
let atomic_counter = atomic_counter;
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
checking_method: CheckingMethod::None,
current_stage,
max_stage,
entries_checked: atomic_counter.load(Ordering::Relaxed),
entries_to_check: max_value,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
}
}
fn look_for_bad_extensions_files(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
let system_time = SystemTime::now();
@ -360,8 +327,15 @@ impl BadExtensions {
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle =
self.prepare_bad_extension_thread_handler(progress_sender, progress_thread_run.clone(), atomic_file_counter.clone(), 1, 1, self.files_to_check.len());
let progress_thread_handle = prepare_thread_handler_common(
progress_sender,
&progress_thread_run,
&atomic_file_counter,
1,
1,
self.files_to_check.len(),
CheckingMethod::None,
);
let mut files_to_check = Default::default();
mem::swap(&mut files_to_check, &mut self.files_to_check);

View file

@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{fs, thread};
@ -148,22 +148,13 @@ impl BigFile {
self.allowed_extensions.set_allowed_extensions(allowed_extensions, &mut self.text_messages);
}
fn look_for_big_files(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
let start_time: SystemTime = SystemTime::now();
let mut folders_to_check: Vec<PathBuf> = Vec::with_capacity(1024 * 2); // This should be small enough too not see to big difference and big enough to store most of paths without needing to resize vector
let mut old_map: BTreeMap<u64, Vec<FileEntry>> = Default::default();
// Add root folders for finding
for id in &self.directories.included_directories {
folders_to_check.push(id.clone());
}
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicU64::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
pub fn prepare_thread_handler(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: &Arc<AtomicBool>,
atomic_file_counter: &Arc<AtomicU64>,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_file_counter = atomic_file_counter.clone();
@ -180,9 +171,23 @@ impl BigFile {
})
} else {
thread::spawn(|| {})
};
}
}
fn look_for_big_files(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
let start_time: SystemTime = SystemTime::now();
let mut folders_to_check: Vec<PathBuf> = Vec::with_capacity(1024 * 2); // This should be small enough too not see to big difference and big enough to store most of paths without needing to resize vector
let mut old_map: BTreeMap<u64, Vec<FileEntry>> = Default::default();
// Add root folders for finding
for id in &self.directories.included_directories {
folders_to_check.push(id.clone());
}
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicU64::new(0));
let progress_thread_handle = self.prepare_thread_handler(progress_sender, &progress_thread_run, &atomic_file_counter);
//// PROGRESS THREAD END
while !folders_to_check.is_empty() {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
// End thread which send info to gui

View file

@ -5,7 +5,7 @@ use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs, mem, panic, thread};
@ -198,6 +198,38 @@ impl BrokenFiles {
self.excluded_items.set_excluded_items(excluded_items, &mut self.text_messages);
}
pub fn prepare_thread_handler_broken_files(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: &Arc<AtomicBool>,
atomic_counter: &Arc<AtomicUsize>,
current_stage: u8,
max_stage: u8,
max_value: usize,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage,
max_stage,
files_checked: atomic_counter.load(Ordering::Relaxed),
files_to_check: max_value,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
}
}
fn check_files(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
let start_time: SystemTime = SystemTime::now();
let mut folders_to_check: Vec<PathBuf> = Vec::with_capacity(1024 * 2); // This should be small enough too not see to big difference and big enough to store most of paths without needing to resize vector
@ -209,30 +241,9 @@ impl BrokenFiles {
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_thread_handler_broken_files(progress_sender, &progress_thread_run, &atomic_file_counter, 0, 1, 0);
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_file_counter = atomic_file_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 0,
max_stage: 1,
files_checked: atomic_file_counter.load(Ordering::Relaxed),
files_to_check: 0,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
while !folders_to_check.is_empty() {
@ -431,33 +442,10 @@ impl BrokenFiles {
non_cached_files_to_check = files_to_check;
}
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_thread_handler_broken_files(progress_sender, &progress_thread_run, &atomic_file_counter, 1, 1, non_cached_files_to_check.len());
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_file_counter = atomic_file_counter.clone();
let files_to_check = non_cached_files_to_check.len();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 1,
max_stage: 1,
files_checked: atomic_file_counter.load(Ordering::Relaxed),
files_to_check,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let mut vec_file_entry: Vec<FileEntry> = non_cached_files_to_check
.into_par_iter()
.map(|(_, mut file_entry)| {

View file

@ -1,9 +1,12 @@
use std::ffi::OsString;
use std::fs;
use std::fs::{File, OpenOptions};
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime};
use std::{fs, thread};
#[cfg(feature = "heif")]
use anyhow::Result;
@ -12,6 +15,7 @@ use image::{DynamicImage, ImageBuffer, Rgb};
use imagepipe::{ImageSource, Pipeline};
// #[cfg(feature = "heif")]
// use libheif_rs::LibHeif;
use crate::common_dir_traversal::{CheckingMethod, ProgressData};
#[cfg(feature = "heif")]
use libheif_rs::{ColorSpace, HeifContext, RgbChroma};
@ -318,6 +322,38 @@ impl Common {
}
}
}
pub fn prepare_thread_handler_common(
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: &Arc<AtomicBool>,
atomic_counter: &Arc<AtomicUsize>,
current_stage: u8,
max_stage: u8,
max_value: usize,
checking_method: CheckingMethod,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
checking_method,
current_stage,
max_stage,
entries_checked: atomic_counter.load(Ordering::Relaxed),
entries_to_check: max_value,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
}
}
#[cfg(test)]
mod test {

View file

@ -11,16 +11,16 @@ use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime};
use std::{fs, mem, thread};
use std::time::SystemTime;
use std::{fs, mem};
use crossbeam_channel::Receiver;
use humansize::format_size;
use humansize::BINARY;
use rayon::prelude::*;
use crate::common::{open_cache_folder, Common, LOOP_DURATION};
use crate::common::{open_cache_folder, prepare_thread_handler_common, Common};
use crate::common_dir_traversal::{CheckingMethod, DirTraversalBuilder, DirTraversalResult, FileEntry, ProgressData};
use crate::common_directory::Directories;
use crate::common_extensions::Extensions;
@ -644,41 +644,6 @@ impl DuplicateFinder {
}
}
// TODO Generalize this if possible with different tools
fn prepare_hash_thread_handler(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: Arc<AtomicBool>,
atomic_counter: Arc<AtomicUsize>,
current_stage: u8,
max_stage: u8,
max_value: usize,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run;
let atomic_counter = atomic_counter;
let checking_method = self.check_method;
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
checking_method,
current_stage,
max_stage,
entries_checked: atomic_counter.load(Ordering::Relaxed),
entries_to_check: max_value,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
}
}
fn prehashing(
&mut self,
stop_receiver: Option<&Receiver<()>>,
@ -691,13 +656,14 @@ impl DuplicateFinder {
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_hash_thread_handler(
let progress_thread_handle = prepare_thread_handler_common(
progress_sender,
progress_thread_run.clone(),
atomic_file_counter.clone(),
&progress_thread_run,
&atomic_file_counter,
1,
2,
self.files_with_identical_size.values().map(Vec::len).sum(),
self.check_method,
);
let loaded_hash_map;
@ -836,13 +802,14 @@ impl DuplicateFinder {
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_file_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_hash_thread_handler(
let progress_thread_handle = prepare_thread_handler_common(
progress_sender,
progress_thread_run.clone(),
atomic_file_counter.clone(),
&progress_thread_run,
&atomic_file_counter,
2,
2,
pre_checked_map.values().map(Vec::len).sum(),
self.check_method,
);
//// PROGRESS THREAD END