UC3M

Telematic/Audiovisual Syst./Communication Syst. Engineering

Systems Architecture

September 2017 - January 2018

11.5.2.  Concurrent array processing (with threads and mutex)

Work Plan

This exercise aims to analyze a program that performs concurrent processing of a table. The idea is to spread the load between different threads together to save the output in a shared variable. This variable is protected by a mutex.

The basic idea is processing a long data structure and divide the work into several threads:

  1. In the example given there are 4 threads, each of which has a quarter of the table and stores the result in the shared variable called sum.

  2. When all threads have finished execution and leave the do_work function, the main function ends.

  3. The threads share a shared variable sum used by all threads. The threads read and write using a mutex for it. This is necessary to synchronize reading and writing operations in threads.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/******************************************************************************
* compile with gcc -pthread *.c -o loops
* test with valgrind --tool=helgrind ./lops
*
******************************************************************************/
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NTHREADS      4
#define ARRAYSIZE   100000000
#define ITERATIONS   ARRAYSIZE / NTHREADS

double  sum=0.0;
double a[ARRAYSIZE];
pthread_mutex_t sum_mutex;


void *do_work(void *tid) 
{
  int i, start, *mytid, end;
  double mysum=0.0;

  /* Initialize my part of the global array and keep local sum */
  mytid = (int *) tid;
  start = (*mytid * ITERATIONS);
  end = start + ITERATIONS;
  printf ("\n[Thread %5d] Doing iterations \t%10d to \t %10d",*mytid,start,end-1); 
  for (i=start; i < end ; i++) {
    a[i] = i * 1.0;
    mysum = mysum + a[i];
    }

  /* Lock the mutex and update the global sum, then exit */
  pthread_mutex_lock (&sum_mutex);
  sum = sum + mysum;
  pthread_mutex_unlock (&sum_mutex);
  pthread_exit(NULL);
}


int main(int argc, char *argv[])
{
  int i, start, tids[NTHREADS];
  pthread_t threads[NTHREADS];
  pthread_attr_t attr;

  /* Pthreads setup: initialize mutex and explicitly create threads in a
     joinable state (for portability).  Pass each thread its loop offset */
  pthread_mutex_init(&sum_mutex, NULL);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (i=0; i<NTHREADS; i++) {
    tids[i] = i;
    pthread_create(&threads[i], &attr, do_work, (void *) &tids[i]);
    }

  /* Wait for all threads to complete then print global sum */ 
  for (i=0; i<NTHREADS; i++) {
    pthread_join(threads[i], NULL);
  }
  printf ("\n[MAIN] Done. Sum= %e", sum);

  sum=0.0;
 /* for (i=0;i<ARRAYSIZE;i++){ 
  a[i] = i*1.0;
  sum = sum + a[i]; }
  printf("\n[MAIN] Check Sum= %e",sum);
*/
  /* Clean up and exit */
  pthread_attr_destroy(&attr);
  pthread_mutex_destroy(&sum_mutex);
  pthread_exit (NULL);
}
 

Let's illustrate in a practical way the neccesity of a mutex and also to improve the quality of the code (removing global variables):

  1. Revise the code, compile it with gcc and run it. Calculate the runtime with 1 thread, 2 threads, 4 and 8 threads. Do you notice any difference between each of these cases?

  2. Verify that the code has not any concurrent anomalies. You can use the following command to do it: valgrind --tool=helgrind ./multi_thread_loop_mutex

  3. Remove the mutex and check your code as the Valgrind tool detects the problem. It should point out a "race condition" problem.

  4. Modify the example starting for removing all global variables in the code. That will make the code more portable because they reduce complexity in code.