1
0
Fork 0
mirror of synced 2024-05-17 19:03:08 +12:00

All thread handler simplifying

This commit is contained in:
Rafał Mikrut 2023-05-01 21:16:26 +02:00
parent f826d72f60
commit 30adc4542c
5 changed files with 131 additions and 237 deletions

View file

@ -3,14 +3,14 @@ use std::fs::Metadata;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs, thread};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{fs};
use crossbeam_channel::Receiver;
use rayon::prelude::*;
use crate::common::LOOP_DURATION;
use crate::common::{prepare_thread_handler_common};
use crate::common_directory::Directories;
use crate::common_extensions::Extensions;
use crate::common_items::ExcludedItems;
@ -333,37 +333,17 @@ where
// Add root folders for finding
folders_to_check.extend(self.root_dirs);
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_entry_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = self.progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_entry_counter = atomic_entry_counter.clone();
let checking_method = self.checking_method;
let max_stage = self.max_stage;
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
checking_method,
current_stage: 0,
max_stage,
entries_checked: atomic_entry_counter.load(Ordering::Relaxed),
entries_to_check: 0,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let progress_thread_handle = prepare_thread_handler_common(
self.progress_sender,
&progress_thread_run,
&atomic_entry_counter,
0,
self.max_stage,
0,
self.checking_method,
);
let DirTraversal {
collect,

View file

@ -5,9 +5,9 @@ use std::io::{BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::time::{Duration, SystemTime};
use std::{mem, panic, thread};
use std::time::{SystemTime};
use std::{mem, panic};
use crossbeam_channel::Receiver;
use lofty::TaggedFileExt;
@ -15,8 +15,8 @@ use lofty::{read_from, AudioFile, ItemKey};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use crate::common::{create_crash_message, AUDIO_FILES_EXTENSIONS};
use crate::common::{open_cache_folder, Common, LOOP_DURATION};
use crate::common::{create_crash_message, prepare_thread_handler_common, AUDIO_FILES_EXTENSIONS};
use crate::common::{open_cache_folder, Common};
use crate::common_dir_traversal::{CheckingMethod, DirTraversalBuilder, DirTraversalResult, FileEntry, ProgressData};
use crate::common_directory::Directories;
use crate::common_extensions::Extensions;
@ -357,35 +357,17 @@ impl SameMusic {
let check_was_stopped = AtomicBool::new(false); // Used for breaking from GUI and ending check thread
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
let music_to_check = non_cached_files_to_check.len();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
checking_method: CheckingMethod::None,
current_stage: 1,
max_stage: 2,
entries_checked: atomic_counter.load(Ordering::Relaxed),
entries_to_check: music_to_check,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let progress_thread_handle = prepare_thread_handler_common(
progress_sender,
&progress_thread_run,
&atomic_counter,
1,
2,
non_cached_files_to_check.len(),
CheckingMethod::None,
);
// Clean for duplicate files
let mut vec_file_entry = non_cached_files_to_check
@ -524,38 +506,20 @@ impl SameMusic {
true
}
fn check_for_duplicate_tags(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
assert!(MusicSimilarity::NONE != self.music_similarity, "This can't be none");
assert_ne!(MusicSimilarity::NONE, self.music_similarity, "This can't be none");
let start_time: SystemTime = SystemTime::now();
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
let music_to_check = self.music_to_check.len();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
checking_method: CheckingMethod::None,
current_stage: 2,
max_stage: 2,
entries_checked: atomic_counter.load(Ordering::Relaxed),
entries_to_check: music_to_check,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let progress_thread_handle = prepare_thread_handler_common(
progress_sender,
&progress_thread_run,
&atomic_counter,
2,
2,
self.music_to_check.len(),
CheckingMethod::None,
);
let mut old_duplicates: Vec<Vec<MusicEntry>> = vec![self.music_entries.clone()];
let mut new_duplicates: Vec<Vec<MusicEntry>> = Vec::new();

View file

@ -6,7 +6,7 @@ use std::panic;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs, mem, thread};
@ -277,6 +277,38 @@ impl SimilarImages {
// self.delete_folders = delete_folder;
// }
pub fn prepare_thread_handler_similar_images(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: &Arc<AtomicBool>,
atomic_counter: &Arc<AtomicUsize>,
current_stage: u8,
max_stage: u8,
max_value: usize,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage,
max_stage,
images_checked: atomic_counter.load(Ordering::Relaxed),
images_to_check: max_value,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
}
}
/// Function to check if folder are empty.
/// Parameter `initial_checking` for second check before deleting to be sure that checked folder is still empty
fn check_for_similar_images(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
@ -301,33 +333,9 @@ impl SimilarImages {
folders_to_check.push(id.clone());
}
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 0,
max_stage: 3,
images_checked: atomic_counter.load(Ordering::Relaxed),
images_to_check: 0,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let progress_thread_handle = self.prepare_thread_handler_similar_images(progress_sender, &progress_thread_run, &atomic_counter, 0, 3, 0);
while !folders_to_check.is_empty() {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
@ -525,36 +533,11 @@ impl SimilarImages {
Common::print_time(hash_map_modification, SystemTime::now(), "sort_images - reading data from cache and preparing them");
let hash_map_modification = SystemTime::now();
//// PROGRESS THREAD START
let check_was_stopped = AtomicBool::new(false); // Used for breaking from GUI and ending check thread
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_thread_handler_similar_images(progress_sender, &progress_thread_run, &atomic_counter, 1, 3, non_cached_files_to_check.len());
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
let images_to_check = non_cached_files_to_check.len();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 1,
max_stage: 3,
images_checked: atomic_counter.load(Ordering::Relaxed),
images_to_check,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let mut vec_file_entry: Vec<(FileEntry, Vec<u8>)> = non_cached_files_to_check
.into_par_iter()
.map(|(_s, mut file_entry)| {
@ -706,34 +689,10 @@ impl SimilarImages {
}
}
} else {
//// PROGRESS THREAD START
let check_was_stopped = AtomicBool::new(false); // Used for breaking from GUI and ending check thread
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_mode_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_mode_counter = atomic_mode_counter.clone();
let all_combinations_to_check = all_hashes.len();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 2,
max_stage: 2,
images_checked: atomic_mode_counter.load(Ordering::Relaxed),
images_to_check: all_combinations_to_check,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let progress_thread_handle = self.prepare_thread_handler_similar_images(progress_sender, &progress_thread_run, &atomic_mode_counter, 2, 2, all_hashes.len());
// Don't use hashes with multiple images in bktree, because they will always be master of group and cannot be find by other hashes
let mut hashes_with_multiple_images: HashSet<_> = Default::default(); // Fast way to check if hash have multiple images

View file

@ -5,7 +5,7 @@ use std::io::*;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs, mem, thread};
@ -246,6 +246,38 @@ impl SimilarVideos {
// self.delete_folders = delete_folder;
// }
pub fn prepare_thread_handler_similar_video(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: &Arc<AtomicBool>,
atomic_counter: &Arc<AtomicUsize>,
current_stage: u8,
max_stage: u8,
max_value: usize,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage,
max_stage,
videos_checked: atomic_counter.load(Ordering::Relaxed),
videos_to_check: max_value,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
}
}
/// Function to check if folder are empty.
/// Parameter `initial_checking` for second check before deleting to be sure that checked folder is still empty
fn check_for_similar_videos(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
@ -266,33 +298,9 @@ impl SimilarVideos {
folders_to_check.push(id.clone());
}
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 0,
max_stage: 1,
videos_checked: atomic_counter.load(Ordering::Relaxed),
videos_to_check: 0,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let progress_thread_handle = self.prepare_thread_handler_similar_video(progress_sender, &progress_thread_run, &atomic_counter, 0, 1, 0);
while !folders_to_check.is_empty() {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {
@ -487,30 +495,8 @@ impl SimilarVideos {
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_thread_handler_similar_video(progress_sender, &progress_thread_run, &atomic_counter, 1, 1, non_cached_files_to_check.len());
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
let videos_to_check = non_cached_files_to_check.len();
thread::spawn(move || loop {
progress_send
.unbounded_send(ProgressData {
current_stage: 1,
max_stage: 1,
videos_checked: atomic_counter.load(Ordering::Relaxed),
videos_to_check,
})
.unwrap();
if !progress_thread_run.load(Ordering::Relaxed) {
break;
}
sleep(Duration::from_millis(LOOP_DURATION as u64));
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
let mut vec_file_entry: Vec<FileEntry> = non_cached_files_to_check
.par_iter()
.map(|file_entry| {

View file

@ -4,7 +4,7 @@ use std::io::BufWriter;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs, thread};
@ -134,21 +134,13 @@ impl Temporary {
self.excluded_items.set_excluded_items(excluded_items, &mut self.text_messages);
}
fn check_files(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
let start_time: SystemTime = SystemTime::now();
let mut folders_to_check: Vec<PathBuf> = Vec::with_capacity(1024 * 2); // This should be small enough too not see to big difference and big enough to store most of paths without needing to resize vector
// Add root folders for finding
for id in &self.directories.included_directories {
folders_to_check.push(id.clone());
}
//// PROGRESS THREAD START
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = if let Some(progress_sender) = progress_sender {
fn prepare_thread_handler_temporary(
&self,
progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>,
progress_thread_run: &Arc<AtomicBool>,
atomic_counter: &Arc<AtomicUsize>,
) -> JoinHandle<()> {
if let Some(progress_sender) = progress_sender {
let progress_send = progress_sender.clone();
let progress_thread_run = progress_thread_run.clone();
let atomic_counter = atomic_counter.clone();
@ -167,8 +159,21 @@ impl Temporary {
})
} else {
thread::spawn(|| {})
};
//// PROGRESS THREAD END
}
}
fn check_files(&mut self, stop_receiver: Option<&Receiver<()>>, progress_sender: Option<&futures::channel::mpsc::UnboundedSender<ProgressData>>) -> bool {
let start_time: SystemTime = SystemTime::now();
let mut folders_to_check: Vec<PathBuf> = Vec::with_capacity(1024 * 2); // This should be small enough too not see to big difference and big enough to store most of paths without needing to resize vector
// Add root folders for finding
for id in &self.directories.included_directories {
folders_to_check.push(id.clone());
}
let progress_thread_run = Arc::new(AtomicBool::new(true));
let atomic_counter = Arc::new(AtomicUsize::new(0));
let progress_thread_handle = self.prepare_thread_handler_temporary(progress_sender, &progress_thread_run, &atomic_counter);
while !folders_to_check.is_empty() {
if stop_receiver.is_some() && stop_receiver.unwrap().try_recv().is_ok() {