/* Crafter®.Thread Copyright (C) 2025 Catcrafts® Catcrafts.net This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ module Crafter.Thread:ThreadPool_impl; import std; import :ThreadPool; using namespace Crafter::Thread; void ThreadPool::Start(int maxThreads) { stopped = false; if (maxThreads > 0) { threadCount = maxThreads; } else { threadCount = std::thread::hardware_concurrency(); } threadStorage = new ThreadStorage[threadCount]; for (unsigned int i = 0; i < threadCount - 1; i++) { threadStorage[i].thread = std::thread(ThreadPool::WaitAndExecuteTask, threadStorage + i); threadIdMap[threadStorage[i].thread.get_id()] = &threadStorage[i]; } threadIdMap[std::this_thread::get_id()] = &threadStorage[threadCount - 1]; activeThreads = threadCount - 1; } void ThreadPool::Stop() { stopped = true; for (unsigned int i = 0; i < threadCount - 1; i++) { threadStorage[i].cv.notify_one(); threadStorage[i].thread.join(); } } void ThreadPool::Enqueue(std::function task) { unsigned int min = std::numeric_limits::max(); unsigned int minIndex = 0; for (unsigned int i = 0; i < activeThreads; i++) { unsigned int taskCount = threadStorage[i].taskCount; if (taskCount == 0) { minIndex = i; break; } if (taskCount < min) { min = taskCount; minIndex = i; } } { std::lock_guard lk(threadStorage[minIndex].mutex); threadStorage[minIndex].buffer.push_back(task); threadStorage[minIndex].taskCount++; } threadStorage[minIndex].cv.notify_one(); } void ThreadPool::Enqueue(std::vector> tasks) { unsigned int size = tasks.size(); unsigned int tasksPerThread = size / activeThreads; int remainder = size % activeThreads; if(remainder) { for (unsigned int i = 0; i < activeThreads - 1; i++) { { std::lock_guard lk(threadStorage[i].mutex); threadStorage[i].buffer.insert(threadStorage[i].buffer.end(), tasks.begin() + tasksPerThread * i, tasks.begin() + (tasksPerThread * i + tasksPerThread)); threadStorage[i].taskCount += tasksPerThread * i; } threadStorage[i].cv.notify_one(); } { std::lock_guard lk(threadStorage[activeThreads - 1].mutex); threadStorage[activeThreads - 1].buffer.insert(threadStorage[activeThreads - 1].buffer.end(), tasks.begin() + tasksPerThread * (activeThreads - 1), tasks.begin() + (tasksPerThread * (activeThreads - 1) + remainder)); threadStorage[activeThreads - 1].taskCount += tasksPerThread + remainder; } threadStorage[activeThreads - 1].cv.notify_one(); } else{ for (unsigned int i = 0; i < activeThreads; i++) { { std::lock_guard lk(threadStorage[i].mutex); threadStorage[i].buffer.insert(threadStorage[i].buffer.end(), tasks.begin() + tasksPerThread * i, tasks.begin() + (tasksPerThread * i + tasksPerThread)); threadStorage[i].taskCount += tasksPerThread * i; } threadStorage[i].cv.notify_one(); } } } void ThreadPool::JoinPool() { activeThreads++; ThreadPool::WaitAndExecuteTask(threadStorage + threadCount - 1); } void ThreadPool::WaitAndExecuteTask(ThreadStorage* threadStorage) { while (!stopped || !threadStorage->buffer.empty()) { std::unique_lock lock(threadStorage->mutex); threadStorage->cv.wait(lock, [threadStorage] { return !threadStorage->buffer.empty() || stopped; }); threadStorage->buffer.swap(threadStorage->tasks); lock.unlock(); for (std::function task : threadStorage->tasks) { task(); threadStorage->taskCount--; } threadStorage->tasks.clear(); } }