【HPC】 C/C++多线程并行/并发

主要介绍C++多线程并行/并发

多线程并行/并发

同一进程各个线程之间共享内存,可用多个线程并行执行,每个线程处理数据或操作的一部分,类似OpenMP。

线程

  • 操作系统调度的最小单位
  • 每个进程可以有多个线程,线程之间共享进程的内存空间,但有自己的栈、寄存器等
  • 线程切换涉及保存和恢复上下文,内存分配,线程同步等,具有一定的系统开销(注意衡量数据规模对应的线程数量)。
  • 在多核处理器上,线程可以实现真正的并行计算。
  • 在 C 语言中,线程通常通过 POSIX 线程(pthread)库来实现

使用

用多线程优化下面算子:

1
2
3
4
5
6
7
int sum_array(int *arr, int len) {
  int sum = 0;
  for(int i = 0; i < len; ++i) {
    sum += arr[i];
  }
  return sum;
}

C例子

优化完成程序:(在链接时指定 -pthread,告诉 GCC 在编译和链接时启用 POSIX 线程支持)

仅使用多线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

typedef struct {
    int *arr;
    int start;
    int end;
    int partial_sum;
} ThreadData;

void *sum_partial(void *arg) {
    ThreadData *data = (ThreadData *)arg;
    data->partial_sum = 0;
    for (int i = data->start; i < data->end; ++i) {
        data->partial_sum += data->arr[i];
    }
    return NULL;
}

int sum_array(int *arr, int len, int num_threads) {
    pthread_t *threads = malloc(sizeof(pthread_t) * num_threads);
    ThreadData *thread_data = malloc(sizeof(ThreadData) * num_threads);
    
    int chunk_size = len / num_threads;
    int remainder = len % num_threads;

    // Create threads to compute partial sums
    for (int i = 0; i < num_threads; ++i) {
        thread_data[i].arr = arr;
        thread_data[i].start = i * chunk_size;
        thread_data[i].end = (i == num_threads - 1) ? len : (i + 1) * chunk_size;

        // Handle remainder elements (if any)
        if (i == num_threads - 1 && remainder != 0) {
            thread_data[i].end += remainder;
        }
        pthread_create(&threads[i], NULL, sum_partial, &thread_data[i]);
    }

    // Wait for all threads to finish and calculate the total sum
    int total_sum = 0;
    for (int i = 0; i < num_threads; ++i) {
        pthread_join(threads[i], NULL);
        total_sum += thread_data[i].partial_sum;
    }

    free(threads);
    free(thread_data);
    return total_sum;
}

int main() {
    int arr[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    int len = sizeof(arr)
    int num_threads = 4;

    int result = sum_array(arr, len, num_threads);
    printf("Total sum: %d\n", result);
    return 0;
}

使用多线程+互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 4  // 使用4个线程

// 线程参数结构体
typedef struct {
    int *arr;
    int start;
    int end;
    int *sum;
    pthread_mutex_t *mutex;
} ThreadData;

void* thread_sum(void *arg) {
    ThreadData *data = (ThreadData *)arg;
    int partial_sum = 0;

    for (int i = data->start; i < data->end; ++i) {
        partial_sum += data->arr[i];
    }

    // 使用互斥锁保护共享资源sum
    pthread_mutex_lock(data->mutex);
    *data->sum += partial_sum;
    pthread_mutex_unlock(data->mutex);
    return NULL;
}

int sum_array(int *arr, int len, int *sum, pthread_mutex_t *sum_mutex) {
    pthread_t threads[NUM_THREADS];
    ThreadData thread_data[NUM_THREADS];
    int chunk_size = len / NUM_THREADS;
    // 创建线程
    for (int i = 0; i < NUM_THREADS; ++i) {
        thread_data[i].arr = arr;
        thread_data[i].start = i * chunk_size;
        thread_data[i].end = (i == NUM_THREADS - 1) ? len : (i + 1) * chunk_size;  // 最后一个线程处理剩余部分
        thread_data[i].sum = sum;
        thread_data[i].mutex = sum_mutex;
        pthread_create(&threads[i], NULL, thread_sum, (void*)&thread_data[i]);
    }
    // 等待所有线程完成
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_join(threads[i], NULL);
    }
    return *sum;
}

int main() {
    int arr[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};  // 示例数组
    int len = sizeof(arr) / sizeof(arr[0]);
    int sum = 0;  // 用于存储总和
    pthread_mutex_t sum_mutex;  // 互斥锁
    pthread_mutex_init(&sum_mutex, NULL);
    // 调用sum_array进行求和
    int result = sum_array(arr, len, &sum, &sum_mutex);
    pthread_mutex_destroy(&sum_mutex);
    printf("Sum of array: %d\n", result);
    return 0;
}

C++ 例子

优化完成程序:(在链接时指定 -pthread,告诉 GCC 在编译和链接时启用 POSIX 线程支持)

仅使用多线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <vector>
#include <thread>

int sum_array(int *arr, int len) {
    const int num_threads = 4;
    int chunk_size = len / num_threads;

    std::vector<std::thread> threads(num_threads);
    std::vector<int> partial_sums(num_threads, 0);

    // 定义一个线程函数,用于计算某个范围内的和
    auto sum_part = [&](int thread_id) {
        int start = thread_id * chunk_size;
        int end = (thread_id == num_threads - 1) ? len : (thread_id + 1) * chunk_size;
        for (int i = start; i < end; ++i) {
            partial_sums[thread_id] += arr[i];
        }
    };

    for (int i = 0; i < num_threads; ++i) {
        threads[i] = std::thread(sum_part, i);
    }
    for (int i = 0; i < num_threads; ++i) {
        threads[i].join();
    }

    int total_sum = 0;
    for (int i = 0; i < num_threads; ++i) {
        total_sum += partial_sums[i];
    }
    return total_sum;
}

int main() {
    // 示例数组
    int arr[] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    int len = sizeof(arr) / sizeof(arr[0]);

    int result = sum_array(arr, len);
    std::cout << "Sum of array: " << result << std::endl;
    return 0;
}

使用多线程+原子操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <numeric>  // std::accumulate

void partial_sum(const std::vector<int>& data, size_t start, size_t end, std::atomic<long long>& result) {
    long long partial_result = 0;
    for (size_t i = start; i < end; ++i) {
        partial_result += data[i];
    }
    result += partial_result;
    // result += std::accumulate(data.data()+start, data.data()+end, 0);
}

int main() {
    const size_t data_size = 10000000;
    std::vector<int> data(data_size, 1);
    const size_t num_threads = 4; // std::thread::hardware_concurrency(); 获取硬件支持的线程数
    std::vector<std::thread> threads;
    std::atomic<long long> result(0);

    size_t chunk_size = data_size / num_threads;
    for (size_t i = 0; i < num_threads; ++i) {
        size_t start = i * chunk_size;
        size_t end = (i == num_threads - 1) ? data_size : (i + 1) * chunk_size;  // 最后一个线程处理剩余部分
        threads.emplace_back(partial_sum, std::cref(data), start, end, std::ref(result));
    }

    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Total sum: " << result.load() << std::endl;
    return 0;
}

使用多线程+互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>

void partial_sum(const std::vector<int>& data, size_t start, size_t end, long long& result, std::mutex& mtx) {
    long long local_sum = 0;
    for (size_t i = start; i < end; ++i) {
        local_sum += data[i];
    }

    mtx.lock(); // std::lock_guard<std::mutex> lock(mtx);
    result += local_sum;
    mtx.unlock();
}

int main() {
    const size_t data_size = 1'000'000;
    const size_t thread_count = 4;
    std::vector<int> data(data_size, 1);
    long long result = 0;
    std::mutex mtx;

    std::vector<std::thread> threads;
    size_t chunk_size = data_size / thread_count;
    for (size_t i = 0; i < thread_count; ++i) {
        size_t start = i * chunk_size;
        size_t end = (i == thread_count - 1) ? data_size : start + chunk_size;
        threads.emplace_back(partial_sum, std::cref(data), start, end, std::ref(result), std::ref(mtx));
    }

    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Total sum: " << result << std::endl;
    return 0;
}
Licensed under CC BY-NC-SA 4.0
最后更新于 Feb 19, 2025 00:00 +0800
loveleaves
使用 Hugo 构建
主题 StackJimmy 设计