Vector of struct vs. two 1D vectors

Pages: 123
Greetings,

I received the prototype of a simulation code from a colleague for a bunch of particles in two dimensions (x and y coordinates), and I went and improved it - or so I thought!
There are three 2D quantities that need to be stored for the N particles (position, velocity, force).
In his version, he used two 1D C-arrays of size N for each of these quantities, so in total 6 C-arrays of size N.

In my version, I beautified it and introduced a structure holding two double values like
1
2
3
4
struct coordinate{
    double x{0};
    double y{0};
};

and then worked with 3 different std:: vector<coordinate> of size N to store the quantities.

The main loop looks roughly like this
1
2
3
4
5
6
7
8
9
10
11
12
13
14

for (int i=0; i<N_iter; ++i){

    // iterate over force vectors/arrays to
    // update forces

    // iterate over velocity vectors/arrays to
    // update velocities

    // iterate over position vectors/arrays to
    // update positions
    
}


For example, the force update in his version looks like
1
2
3
4
for (int i=0; i<N; ++i){
  force_x[i] = //update term
  force_y[i] = //update term
}

whereas my version with the struct looks like
1
2
3
4
for (int i=0; i<N; ++i){
  forces[i].x = //update term
  forces[i].y = //update term
}


Is it possible, that his way of doing things is much more efficient?
Because his runtime is much shorter (200s vs 75s), and I am out of ideas where else my bottleneck could be.

Cheers!
Last edited on
What you have shown us is not the bottleneck problem.

Both data structure approaches are fine. The C++ is just nicer to work with as a human.

Have you tried profiling both codes?

The first things I’d look for:
  • unnecessary allocations/deallocations in the C++ code
  • related: unnecessary copies of data in the C++ code
  • I/O bottlenecks
  • L1 cache issues when manipulating the data
  • algorithm choice

Good luck; keep us updated.
Thanks Duthomhas,

I had indeed an error in my algorithm (computed the force twice rather than once per iteration).
My code was still much slower, though. I successively changed the outline of my code to resemble the version of my colleague, but it remains much slower (110s vs. 90s). This is despite the fact that I do several micro optimizations (fewer if statements, prevention of redundant computations by storing more constant variables, `++i` rather than `i++`, fewer function calls in the loop).

There has to be an obvious flaw in my version. I could only think of this C-array vs. vector of struct difference. Would be great if someone could take a look.

Version 1 (colleague's version, runs faster):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#include <math.h>
# include <chrono>
#include <iostream>
#include <random>
#include <fstream>

#define n                   1000           // number of particles
#define gamma               0.05              // friction coefficient
#define tau                 0.0033              // temperature 
#define sigma_2             1.0            // variance of attractive force
#define printskip           10                // which iterations are saved
#define L                   10.               //box length
#define number_of_iterations 10000         //number of iterations
#define dt                  0.1             //timestep


/////////////////////////////////////////////////////////////////////////////
//////////////////////////////////Force//////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////
double distance(double x_1, double x_2){

    // need to do r distance here
    // periodic boundary on [-1,1]
    static double dx;
    dx = x_1 - x_2;
    if (dx > L){
        dx -= 2*L ;
    } 
    else if (dx < -L){
        dx += 2*L;
    }
    return dx;
}


void Force(double x[n],double y[n],double fx[n], double fy[n])
{

    static int i,j;
    static double F_ij;
    static double dx,dy,r2;
    static double kappa = 1./n;

    for (int i = 0; i<n;i++){
        fx[i] = 0.;
        fy[i] = 0.;
    }

    for (int i = 0;i<n;i++){
        for (int j =i+1;j<n;j++){

            // x force
            dx = distance(x[i],x[j]);
            dy = distance(y[i],y[j]);
            r2 = dx*dx + dy*dy;
            F_ij = exp(-r2/(2*sigma_2)); 
            
            fx[i] += -kappa*dx*F_ij/(sigma_2);
            fx[j] += kappa*dx*F_ij/(sigma_2);

            // y force
            fy[i] += -kappa*dy*F_ij/(sigma_2);
            fy[j] += kappa*dy*F_ij/(sigma_2);
        }
    }
    
    
}


//////////////////////////////////////////////////////////////////////////////////
////////////////////////////////Integrator////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////

void one_BAOAB_step(double qx[n], double qy[n], double px[n], double py[n], double fx[n], double fy[n], std:: mt19937& RNG)
{
    
	static std:: normal_distribution<> normal{0,1};


    static int i,j,k;
    static float c;

    c = exp(-gamma*dt);
    
    
    //B
    for (int i = 0; i < n;i++){
        px[i] = px[i] + 0.5*dt*fx[i];
        py[i] = py[i] + 0.5*dt*fy[i];
    }



    //A
    for (int i = 0; i < n;i++){
        qx[i] = qx[i] + 0.5*dt*px[i];
        if (qx[i] > L){
            qx[i] = qx[i] - 2.*L;
        }
        if (qx[i] < -L){
            qx[i] = qx[i] + 2.*L;
        }
        qy[i] = qy[i] + 0.5*dt*py[i];
        if (qy[i] > L){
            qy[i] = qy[i] - 2.*L;
        }
        if (qy[i] < -L){
            qy[i] = qy[i] + 2.*L;
        }
    }



    //O
    for (int i = 0; i < n;i++){
        px[i] = c*px[i] + sqrt((1 - c*c)*tau)*normal(RNG);
        py[i] = c*py[i] + sqrt((1 - c*c)*tau)*normal(RNG);
    }


    //A
    for (int i = 0; i < n;i++){
        qx[i] = qx[i] + 0.5*dt*px[i];
        if (qx[i] > L){
            qx[i] = qx[i] - 2.*L;
        }
        if (qx[i] < -L){
            qx[i] = qx[i] + 2.*L;
        }
        qy[i] = qy[i] + 0.5*dt*py[i];
        if (qy[i] > L){
            qy[i] = qy[i] - 2.*L;
        }
        if (qy[i] < -L){
            qy[i] = qy[i] + 2.*L;
        }
    }

    //update force
    Force(qx,qy,fx,fy);

    //B
    for (int i = 0; i < n;i++){
        px[i] = px[i] + 0.5*dt*fx[i];
        py[i] = py[i] + 0.5*dt*fy[i];
    }

}

//////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////MAIN///////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////

int main(int argc, const char * argv[]) {
    
    static double qx[n], px[n], fx[n], qy[n], py[n], fy[n];
    
    std:: mt19937 twister;
    int randomseed = 2;
    std:: seed_seq seq{1,20,3200,403,5*randomseed+1,12000,73667,9474+randomseed,19151-randomseed};
    std:: vector < std::uint32_t > seeds(1);
    seq.generate(seeds.begin(), seeds.end());
    twister.seed(seeds.at(0)); 

    
    // initialize the trajectory uniformly
    static double ii = 0.,jj = 0.;
    for (int i = 0; i<n;i++){
        px[i] = 0.;
        py[i] = 0.;
        
    }
        // Set positions.
    std:: uniform_real_distribution<double> box_uniform(-L, L);
    for (int i=0; i<n; ++i){
        qx[i] = box_uniform(twister);
        qy[i] = box_uniform(twister);
    }

    
    Force(qx,qy,fx,fy);  // updates forces fx, fy
    
    // ############ MAIN LOOP ##################################
    auto t1 = std:: chrono::high_resolution_clock::now();
    
    for (int num = 0; num<=number_of_iterations; num++)
        { 
            one_BAOAB_step(qx,qy,px,py,fx,fy, twister);
            if (num % 1000==0) std::cout << "Iteration "<<num<< " done!\n";

        }
    
    auto t2 = std:: chrono:: high_resolution_clock:: now();
    auto ms_int = std:: chrono:: duration_cast < std:: chrono:: seconds > (t2 - t1);
    std:: cout << "Execution took " << ms_int.count() << " seconds!\n"; 

    return 0;

}

Last edited on
Version 2 (my version, runs slower but should be faster IMO):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include <iostream>
#include <vector>
#include <cmath>
#include <random>
#include <fstream>
#include <chrono>

#define L 10
#define N_iter 10000
#define n_meas 10
#define n_part 1000
#define beta 300
#define gamma 0.05
#define kappa 1./n_part
#define sigma_2 1
#define stepsize 0.1
#define randomseed 1

struct coordinate{
    double x{0};
    double y{0};
};


void compute_force_naive(std:: vector<coordinate>& forces, const std:: vector<coordinate>& positions){

    static double dx, dy;
    static double dist2;
    static double force;
    static double two_L = 2*L;
    static double pref {-kappa/(2*sigma_2)};
    static double expo;

    for (int i=0; i<n_part; ++i){
        forces[i].x = 0;
        forces[i].y = 0;
    }

    for (int i=0; i<n_part; ++i){
        for(int j=i+1; j<n_part; ++j){
            
            dx = positions[i].x - positions[j].x;
            dx = dx > L ? dx-two_L : (dx < -L ? dx+two_L : dx);

            dy = positions[i].y - positions[j].y;
            dy = dy > L ? dy-two_L : (dy < -L ? dy+two_L : dy);

            dist2 = dx*dx + dy*dy;

            expo = exp(-dist2/(2*sigma_2));
     
            force = pref*expo;

            forces[i].x += force*dx;
            forces[i].y += force*dy;
            forces[j].x += -force*dx;
            forces[j].y += -force*dy;
        }
    }

    return;

}




int main(int argc, char* argv[]){

    // Prepare RNG.
    std:: mt19937 twister;    
    std:: seed_seq seq{1,20,3200,403,5*randomseed+1,12000,73667,9474+randomseed,19151-randomseed};
    std:: vector < std::uint32_t > seeds(1);
    seq.generate(seeds.begin(), seeds.end());
    twister.seed(seeds.at(0)); 

    // Set positions.
    std:: uniform_real_distribution<double> box_uniform(-L, L);
    std:: vector <coordinate> positions(n_part);
    for (int i=0; i<n_part; ++i){
        positions[i].x = box_uniform(twister);
        positions[i].y = box_uniform(twister);
    }

    std:: vector <coordinate> forces(n_part);
    compute_force_naive(forces, positions);

    std:: vector <coordinate> velocities(n_part);


    //################## MAIN LOOP ################################.
    const double two_L = 2*L;  // some constants
    const double stepsize_half = 0.5 * stepsize;   
    std:: normal_distribution<> normal{0,1};
    static const double a = exp(-gamma*stepsize);
    static const double pref = sqrt(1/beta *(1-a*a));
    
    auto t1 = std:: chrono::high_resolution_clock::now();
    for (int i=0; i<=N_iter; ++i){
        
        // B
        // Update velocities.
        for (int i=0; i<n_part; ++i){
            velocities[i].x += stepsize_half*forces[i].x;
            velocities[i].y += stepsize_half*forces[i].y;
        }

        // A
        for (int i=0; i<n_part; ++i){
            positions[i].x += stepsize_half*velocities[i].x;
            positions[i].x = positions[i].x>L ? positions[i].x - two_L : (positions[i].x<-L ? positions[i].x + two_L : positions[i].x);

            positions[i].y += stepsize_half*velocities[i].y;
            positions[i].y = positions[i].y>L ? positions[i].y - two_L : (positions[i].y<-L ? positions[i].y + two_L : positions[i].y);
        }

        // O
        for (int i=0; i<n_part; ++i){
            velocities[i].x = a*velocities[i].x + pref*normal(twister); 
            velocities[i].y = a*velocities[i].y + pref*normal(twister); 
        }

        // A
        for (int i=0; i<n_part; ++i){
            positions[i].x += stepsize_half*velocities[i].x;
            positions[i].x = positions[i].x>L ? positions[i].x - two_L : (positions[i].x<-L ? positions[i].x + two_L : positions[i].x);

            positions[i].y += stepsize_half*velocities[i].y;
            positions[i].y = positions[i].y>L ? positions[i].y - two_L : (positions[i].y<-L ? positions[i].y + two_L : positions[i].y);
        }
        
        // B
        // Get forces.
        compute_force_naive(forces, positions);
        // Update velocities.
        for (int i=0; i<n_part; ++i){
            velocities[i].x += stepsize_half*forces[i].x;
            velocities[i].y += stepsize_half*forces[i].y;
        }

        if (i%1000==0) std::cout << "Iteration "<<i<< " done!\n";

    }
    
    auto t2 = std:: chrono:: high_resolution_clock:: now();
    auto ms_int = std:: chrono:: duration_cast < std:: chrono:: seconds > (t2 - t1);
    std:: cout << "Execution took " << ms_int.count() << " seconds!\n";
    
    return 0;

}
Creating a 2D vector isn't all that difficult, especially when you know what row and column sizes you want when you create the 2D vector.

std::vector<std::vector<int>> vec(row_size, std::vector<int>(col_size));

Should your size needs change it is easy to add and/or remove elements on the fly.

Add in a few template functions and you can output 1D and 2D vectors as easily as a regular single element variable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include <iostream>
#include <vector>
#include <numeric>

template<typename T>
void Display2DVector(const std::vector<std::vector<T>>&);

template <typename T>
std::ostream& operator<<(std::ostream&, const std::vector<std::vector<T>>&);

template <typename T>
std::ostream& operator<<(std::ostream&, const std::vector<T>&);

template<typename T>
void Get2DVectorValues(std::vector<std::vector<T>>&);

int main()
{
   std::cout << "Creating a 2-dimensional vector, enter row size: ";
   unsigned row_size;
   std::cin >> row_size;

   std::cout << "Enter column size: ";
   unsigned col_size;
   std::cin >> col_size;

   std::cout << "\n";

   // create a 2 dimensional int vector with known dimensions
   std::vector<std::vector<int>> aVector(row_size, std::vector<int>(col_size));

   std::cout << "Let's verify the sizes of the 2D vector....\n";
   std::cout << "The vector has " << aVector.size() << " rows.\n";
   std::cout << "The vector has " << aVector[0].size() << " columns.\n\n";

   // display the empty 2D vector
   std::cout << "Displaying the empty 2D vector:\n";
   Display2DVector(aVector);
   std::cout << '\n';

   std::cout << "Initializing the 2D vector with some values....\n";
   // initialize the vector with some values other than zero
   int start = 101;
   int offset = 100;

   for (auto& itr : aVector)  // remember the &!!!!!!!!!!!!!!!!!!!!!
   {
      std::iota(itr.begin(), itr.end(), start);
      start += offset;
   }

   // let's display the filled 2D vector
   std::cout << "Displaying the filled 2D vector:\n";
   Display2DVector(aVector);
   std::cout << '\n';

   std::cout << "Let's try that again....\nDisplaying the filled 2D vector:\n";
   std::cout << aVector;
   std::cout << '\n';

   std::cout << "Adding a value to the end of the 2nd row.\n";
   aVector[1].push_back(999);

   Display2DVector(aVector);
   std::cout << '\n';

   std::cout << "Let's try that again....\nDisplaying the filled 2D vector:\n";
   std::cout << aVector;
   std::cout << '\n';

   // create a smaller vector
   std::vector<std::vector<int>> aVec2(2, std::vector<int>(2));

   std::cout << "Let's get some values for a 2 x 2 vector:\n";
   Get2DVectorValues(aVec2);
   std::cout << '\n';


   std::cout << "Let's display the new values:\n";
   std::cout << aVec2;
}

template<typename T>
void Display2DVector(const std::vector<std::vector<T>>& aVec)
{
   for (unsigned rows = 0; rows < aVec.size(); rows++)
   {
      for (unsigned cols = 0; cols < aVec[rows].size(); cols++)
      {
         std::cout << aVec[rows][cols] << ' ';
      }
      std::cout << '\n';
   }

   // an easier method for looping through and displaying a 2D vector
   // without referencing the rows and columns
   /*
   for (const auto& row_itr : aVec)
   {
      for (const auto& col_itr : row_itr)
      {
         std::cout << col_itr << ' ';
      }
      std::cout << '\n';
   }
   */
}

template <typename T>
std::ostream& operator<<(std::ostream& os, const std::vector<std::vector<T>>& v)
{
   for (auto const& x : v)
   {
      os << x << '\n';
   }

   return os;
}


template <typename T>
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v)
{
   for (auto const& x : v)
   {
      os << x << ' ';
   }

   return os;
}

template<typename T>
void Get2DVectorValues(std::vector<std::vector<T>>& aVec)
{
   for (size_t rows = 0; rows < aVec.size(); rows++)
   {
      for (size_t cols = 0; cols < aVec[0].size(); cols++)
      {
         std::cout << "[" << rows << "][" << cols << "]: ";
         int input;
         std::cin >> input;
         aVec[rows][cols] = input;
      }
   }
}


This is a rather ancient pre-C++20 example I whipped together years ago. I might do things differently now. But it does illustrate how to manipulate a 2D vector.
PhysicsIsFun wrote:
... but it remains much slower (110s vs. 90s).


Surely you mean milliseconds rather than seconds? Even then that is very slow for only 1'000 data points and such simple math. Maybe it's nanoseconds? To give an example I use a geodetic library to convert 1'000'000 grid points to Lat/Long which is fairly complicated computation; on an ordinary i7 chip single thread, it takes 0.96 seconds. I am also obliged to ask, given it is SO SLOW, if you have compiled to release version, not debug?

I notice n_part == N == 1000 .

With the outer for loop on line 99 of your code, it contains 4 inner loops (BAOAB)each of which loop the entire container, then the call to compute_force_naivem (which loops through the entire container, and has another for loop and another nested for loop), then another inner for loop. So that is about O(6N3)

In your colleagues code this is not happening.

Are you able to use C++23? It has a for_each algorithm that has options to automatically multi-thread and vectorise loops.

Hope this helps :+)
Surely you mean milliseconds rather than seconds?


No. It really is seconds. On my laptop V1 takes 98 seconds and V2 takes 112 seconds.

I'm suspecting this could be down to memory cache. Have a look at
https://stackoverflow.com/questions/16699247/what-is-a-cache-friendly-code
https://johnnysswlab.com/make-your-programs-run-faster-by-better-using-the-data-cache/

PS. Why do you need all the various inner loops in main()? Can't the first 4 just be combined into 1 loop?

Also I'd suggest using a different variable to i in lines 99 and 141 as you use i in the inner loops for a different purpose. It's correct as it is but is confusing to read.
Last edited on
seeplus wrote:
No. It really is seconds. On my laptop V1 takes 98 seconds and V2 takes 112 seconds.


Yeah, I thought about that a bit more: There are 1 million items to calculate, but because the call to compute_force_naive inside another for loop means that stuff is being calculated 1 billion times. On the library I use to calc geodetic coordinates, I mention it taking 0.96 seconds to do 1 million items, when I increase that to 1 billion, it takes 70 seconds.

seeplus wrote:
PS. Why do you need all the various inner loops in main()? Can't the first 4 just be combined into 1 loop?


I agree: those BAOAB items seem to be just attributes of the point, so why not make a struct that has x, y, B, A, O, A, B but with better names so we see what they mean? Then have a single nested loop to process all of it. By the way the code that was in each of those for loops could be in a function.
Ill be the reality checker here... 20 seconds is not that much real time. People wait longer for a web page to load sometimes.

How fast does it need to be? I mean, cutting it in half would be meaningful, of course, but its still gonna take a min to run, and whoever kicked it off is going to alt tab it and go check the email or go get a snack or something.

looking at version 2 above, my PC took 48 seconds. You need to upgrade if it took 3 times that. Hardware aside... poking at it a little... not much to be done. Seems like there should be a better way to zero out the forces each time you call the function.

meh, I managed to shave 2 seconds off rearranging stuff but thats no answer. It needs something new & different. The biggest impact on my system was moving positions[i] to a constant in the outer loop. The compiler should have done that without the help.
Last edited on
On my desktop machine, using the latest point release of MSVC, program 1 completes in 55 seconds and program 2 completes in 58 seconds. After program 2 is modified by getting rid of the unnecessary static on lines 27-32, it takes 55 seconds too.

If you want to parallelize the program, you have to be careful since the inner loop has a loop-carried dependency on forces. Each element is read, modified, and written n_part - 1 times by different iterations of the inner loop. One crude but simple way to proceed is to notice the inner loop runs only 499500 times, so the nested loops can be fused and all of the intermediate results can be stored in an array.

This loop fusion changes the nested inner loop from
1
2
3
4
5
for (int i = 0; i < n_part; ++i)
  for (int j = i + 1; j < n_part; ++j)
  {
    // ...
  }

Into
1
2
3
4
5
6
7
8
9
10
11
12
13
for (int x = 0; x < (n_part - 1) * n_part / 2; ++x)
{
  int i = x / n_part;
  int j = x % n_part;

  if (i > j)
  {
    i = n_part - i - 2;
    j = n_part - j - 1;
  }

  // ...
}


We can use this pattern to rearrange the loops into something like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
struct intermediate_result { double dx, dy, force; };
std::vector<intermediate_result> intermediate_results((n_part - 1) * n_part / 2);
static void compute_force_naive2(std::vector<coordinate>& forces, const std::vector<coordinate>& positions) 
{
  double constexpr two_L = 2 * L;
  double constexpr pref{ -kappa / (2 * sigma_2) }; 

  for (int i = 0; i < n_part; ++i) {
    forces[i].x = 0;
    forces[i].y = 0;
  }

  int const x_max = intermediate_results.size();
  // #pragma omp parallel for
  for (int x = 0; x < x_max; ++x)
  {
    int i = x / n_part;
    int j = x % n_part;

    if (i > j)
    {
      i = n_part - i - 2;
      j = n_part - j - 1;
    }

    double dx = positions[i].x - positions[j].x;
    if (dx > +L) dx -= two_L;
    if (dx < -L) dx += two_L;

    double dy = positions[i].y - positions[j].y;
    if (dy > +L) dy -= two_L;
    if (dy < -L) dy += two_L;

    intermediate_results[x].force = pref * exp(-(dx * dx + dy * dy) / (2 * sigma_2));
    intermediate_results[x].dx = dx;
    intermediate_results[x].dy = dy;
  }
  
  for (int x = 0; x < x_max; ++x)
  {
    int i = x / n_part;
    int j = x % n_part;

    if (i > j)
    {
      i = n_part - i - 2;
      j = n_part - j - 1;
    }

    forces[i].x += +intermediate_results[x].force * intermediate_results[x].dx;
    forces[i].y += +intermediate_results[x].force * intermediate_results[x].dy;

    forces[j].x += -intermediate_results[x].force * intermediate_results[x].dx;
    forces[j].y += -intermediate_results[x].force * intermediate_results[x].dy;
  }
}

After measurement I found that roughly 82% of time is spent in the second of the three loops, while essentially all remaining time is spent in the third. The program as a whole takes 77 seconds to execute (that's more than 58s for the unmodified program, since it's doing extra work now.)

Amdahl's Law says that with 32 threads running the second loop we can expect at best a 4.9x speedup in the final product, which means the theoretical best execution time would be 77s / 4.9 = 15.7s. If I give the second loop to OpenMP by putting
#pragma openmp parallel for
on line 14, the program executes in 18s, which is roughly within 10% of the theoretical best.

A less crude way to split the work is to give each thread its own n_part array of forces and collect them at the end. This is more complicated but faster:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
static void compute_force_naive3(std::vector<coordinate>& forces, const std::vector<coordinate>& positions)
{
  int const n_tasks = std::thread::hardware_concurrency();

  static std::vector<std::vector<coordinate>> forces_for_all_tasks(
    n_tasks, std::vector<coordinate>(n_part, coordinate { .x = 0.0, .y = 0.0}));
  static std::vector<std::future<void>> futures;
    
  int const iterations = ((n_part - 1) * n_part) / 2;
  int const iterations_per_task = iterations / n_tasks;
  int const extra_iterations_for_last_task = iterations % iterations_per_task;

  for (int task_number = 0; task_number < n_tasks; ++task_number)
  {
    int task_x_begin = iterations_per_task * task_number;
    int task_x_end   = iterations_per_task * task_number + iterations_per_task;

    if (task_number == n_tasks - 1) task_x_end += extra_iterations_for_last_task;

    auto const task = [task_x_begin, task_x_end, task_number, &positions, &forces]()
      {
        std::vector<coordinate>& forces_for_this_task = forces_for_all_tasks[task_number];

        std::fill(forces_for_this_task.begin(), forces_for_this_task.end(), coordinate{ .x = 0.0, .y = 0.0 });

        for (int x = task_x_begin; x < task_x_end; ++x)
        {
          int i = x / n_part;
          int j = x % n_part;

          if (i > j)
          {
            i = n_part - i - 2;
            j = n_part - j - 1;
          }

          double dx = positions[i].x - positions[j].x;
          dx = dx > L ? dx - two_L : (dx < -L ? dx + two_L : dx);

          double dy = positions[i].y - positions[j].y;
          dy = dy > L ? dy - two_L : (dy < -L ? dy + two_L : dy);

          double dist2 = dx * dx + dy * dy;

          double expo = exp(-dist2 / (2 * sigma_2));

          double force = pref * expo;

          forces_for_this_task[i].x += force * dx;
          forces_for_this_task[i].y += force * dy;
          forces_for_this_task[j].x += -force * dx;
          forces_for_this_task[j].y += -force * dy;
        }
      };

    futures.emplace_back(std::async(std::launch::async, task));
  }
  
  // Destroy all the futures.
  // This blocks until all tasks are complete.
  futures.clear(); 

  // Sum all of the task-specific forces into the output parameter.
  std::fill(forces.begin(), forces.end(), coordinate{ .x = 0.0, .y = 0.0 });
  for (auto const& forces_for_task : forces_for_all_tasks)
    for (int i = 0; i < n_part; ++i)
    {
      forces[i].x += forces_for_task[i].x;
      forces[i].y += forces_for_task[i].y;
    }
}


It completes in 7 seconds on my machine. Measurement shows that 60% of execution is still spent in std::exp, while about 20% is spent randomly accessing forces_for_this_task on lines 49-52. It seems likely that this is a cache issue. The remaining 20% of execution time is scattered throughout the rest of the lambda on lines 20-48.

There is still room for improvement:

It might be possible to delegate the work differently, so that i and j increase together along the diagonals of this triangular loop so that forces_for_this_task is accessed in order.

Additionally the "left-over" work (extra_iterations_for_last_task) could be distributed evenly over all the tasks, but that's very minor.
Last edited on
as a baseline, your version 3 took 13 sec on mine, g++ cygwin compiled.

also interesting, if I removed the dx= and dy= lines that cap the distances based off L and -L ... the single threaded version ran 10 sec faster for me but yours, no effect. I realize they need to be there, it was a 'how costly is this' test only.

more interesting, playing with n_tasks... lowering it is improving mine (your value = 20 on my box).
1->58 sec
4 -> 17 sec
6 -> 13 sec
8-> 11 sec
10 -> 13 sec
12 ->12 sec
14 -> 12 sec
and so on 12-13 for the rest up to 20 and above 20 is slowing it more.

I am not sure exp can be improved or factored out. I looked at that, but I don't think so. All I can think of is if it is called with the same values a lot, if it could recognize that ... if you could change of base it to use base 2, and somehow adjust for that later on... but I am not sure that can be done either. I love/hate logs... stuff like this makes my brain ache.
you can borrow from this stuff:
https://deathandthepenguinblog.wordpress.com/2015/04/13/writing-a-faster-exp/
but beware accuracy loss.
Last edited on
There is a degree to which high optimization is useful, but for simple scientific computing something like this is often more interested in accurate results than shaving a few seconds here or there. As already mentioned, running a small and simple physical system for 20 seconds is trivial. A scientist in the field would be happy if it took less than an hour to complete something he just typed up.

If (more power)&|(bigger data) is needed, there exist programs to do physical simulations with much more accuracy and speed than a small C/C++/python/R/whatever program.

Alas, I’ve long ago lost the desire to analyze and improve random algorithms found on the internet. You lot are better than me for that.
Playing with n_tasks... lowering it is improving mine (your value = 20 on my box).

Same for me, I can cut n_tasks in half for a small speedup, then the execution time starts to increase.

Maybe this is because each pair of tasks has to share a CPU cache, given that there's only one cache and two threads per physical core.
Last edited on
Hi guys,

thank you all for taking time to look into this. I am happy to see that I did not make any really dumb mistakes that slow down my code lol.

We will have to run the code with more iterations and particles, so efficiency is worth to look at.
The main reason I opened this thread is my lack of understanding why the version 1 runs faster, but apparently it comes from cache misses in version 2 or what the compiler does with it?

As I am not experienced with OpenMP, I only wanted to code up a serial prototype for now. I thought it was easy to parallelize the force loop then, but according to @mbozzi the force routine needs to be written accordingly, so maybe I immediately write it in parallel.
Last edited on
Btw how expensive are function calls in a loop? In my own original version, I used functions for the A steps and so on like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    for (int i=0; i<=N_iter; ++i){

        B_step(positions, velocities, forces, stepsize_half, L, kappa, sigma_2);
        A_step(positions, velocities, stepsize_half, L);
        apply_periodic_boundaries(positions, L);
        O_step(velocities, stepsize, gamma, beta, twister);
        A_step(positions, velocities, stepsize_half, L);
        apply_periodic_boundaries(positions, L);
        compute_force(forces, positions, L, kappa, sigma_2);
        B_step(positions, velocities, forces, stepsize_half, L, kappa, sigma_2);

        if (i%1000==0) std::cout << "Iteration "<<i<< " done!\n";

    }

where all the arguments are passed by reference. This greatly increases modularity, but I thought the many function calls were the reason for my code being slower than the other one (that's why version 2 above does not have the functions anymore).
Last edited on
if the compiler does not choose to inline them, which it will if it makes performance sense and is possible, millions of calls will add up. Its not that expensive, once, but doing it repeatedly involves:
two jumps
shuffling stuff around (registers, mostly)
non reference parameters (and not constants) can also add to it, if any

.... the code here isn't doing much. Its the memory layout and access + the sheer number of calls that is "slow". If you can improve the memory page faults/caching or are willing to use a faster approximation of exp, you might drive the values down a little. Rearranging the code to suit your specific compiler and hardware may also get you a little bit.

if you have them anyway, why not try it with the functions? Learn your compiler flags, and set the inline options to most generous (for visual studio, it used to be called "any suitable" but I dunno what gcc calls it) and the back it off, see which one if any shows some differences. At some point more functions is less readable and not easy to follow. You are talking like 10 lines of code, writing 5 functions to split that up so the reader has to go study each one in turn. I see that most of them happen twice so that almost makes sense, esp if you use them elsewhere, but its borderline whether its worth it here if you don't use them again.
Ok, I tested using functions.
Version 1 and 2 above need 80-85s for 20k iterations.

If I use functions and the most aggressive gcc compiler optimization flag -O3, it runs for 103s which is like 20% slower. Note that the force routine here also calls two functions in each iterations. If I replace that routine with the compute_force_naive routine from version 2 above (so that the force itself does not call further functions), the runtime is reduced to 94s.

It seems like the function calls in the loop are the problem then, and the compiler does not inline anything...

Version 0 (with functions):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#include <iostream>
#include <vector>
#include <cmath>
#include <random>
#include <fstream>
#include <chrono>


struct coordinate{
    double x{0};
    double y{0};
};


double compute_sq_distances(const coordinate& pos1, const coordinate& pos2, const double& L, 
                            double& dx, double& dy){

    // box volume = [-L,L]^n

    static double two_L = 2*L;
    dx = pos1.x - pos2.x;
    dx = dx > L ? dx-two_L : (dx < -L ? dx+two_L : dx);

    dy = pos1.y - pos2.y;
    dy = dy > L ? dy-two_L : (dy < -L ? dy+two_L : dy);

    return dx*dx + dy*dy;

}


double force_term(double& dist2, const double& kappa, const double& sigma_2){

    static double pref {-kappa/(2*sigma_2)};
    static double expo;
    expo = exp(-dist2/(2*sigma_2));
     
    return pref*expo;

}


void compute_force(std:: vector<coordinate>& forces, const std:: vector<coordinate>& positions, 
                   const double& L, const double& kappa, const double& sigma_2){

    static double dx, dy;
    static double dist2;
    static int n_part = forces.size();
    static double force;

    for (int i=0; i<n_part; ++i){
        forces[i].x = forces[i].y = 0;
    }

    for (int i=0; i<n_part; ++i){
        for(int j=i+1; j<n_part; ++j){
            dist2 = compute_sq_distances(positions[i], positions[j], L, dx, dy);
            force = force_term(dist2, kappa, sigma_2);
            forces[i].x += force*dx;
            forces[i].y += force*dy;
            forces[j].x += -force*dx;
            forces[j].y += -force*dy;
        }
    }


    return;

}


void B_step(const std:: vector <coordinate>& positions, std:: vector <coordinate>& velocities, std:: vector <coordinate>& forces,
            const double& stepsize, const double& L, const double& kappa, const double& sigma_2){

    static const int n_part = positions.size();

    // Update velocities.
    for (int i=0; i<n_part; ++i){
        velocities[i].x += stepsize*forces[i].x;
        velocities[i].y += stepsize*forces[i].y;
    }

    return;

}

void A_step(std:: vector <coordinate>& positions, const std:: vector <coordinate>& velocities, const double& stepsize, const double& L){

        static int n_part = positions.size();
        for (int i=0; i<n_part; ++i){
            positions[i].x += stepsize*velocities[i].x;
            positions[i].y += stepsize*velocities[i].y;
        }

    return;
}

void O_step(std:: vector <coordinate>& velocities, const double& stepsize,
            const double& gamma, const double& beta, std:: mt19937& RNG){

    static std:: normal_distribution<> normal{0,1};
    static const int n_part = velocities.size();
    static const double a = exp(-gamma*stepsize);
    static const double pref = sqrt(1/beta *(1-a*a));

    for (int i=0; i<n_part; ++i){
        velocities[i].x = a*velocities[i].x + pref*normal(RNG); 
        velocities[i].y = a*velocities[i].y + pref*normal(RNG); 
    }

    return;
    
    }


void apply_periodic_boundaries(std:: vector <coordinate>& positions, const double& L){

    static const int n_part = positions.size();
    static const double two_L = 2*L;
    static double x, y;

    for (int i=0; i<n_part; ++i){
        x = positions[i].x;
        positions[i].x = x>L ? x-two_L : (x<-L ? x + two_L : x);
        y = positions[i].y;
        positions[i].y = y>L ? y-two_L : (y<-L ? y + two_L : y);
    }

    return;

}



int main(int argc, char* argv[]){

    // Hyperparameters.
    const double L {10};
    const int N_iter {20000};
    const int n_meas {10};
    const int n_part {1000};
    const double beta {300};
    const double gamma {0.05};
    const double kappa {1./n_part};
    const double sigma_2 {1};
    const double stepsize {0.1};
    const int randomseed {1};


    // Prepare simulation.
    // Prepare RNG.
    std:: mt19937 twister;
    
    std:: seed_seq seq{1,20,3200,403,5*randomseed+1,12000,73667,9474+randomseed,19151-randomseed};
    std:: vector < std::uint32_t > seeds(1);
    seq.generate(seeds.begin(), seeds.end());
    twister.seed(seeds.at(0)); 

    // Set positions.
    std:: uniform_real_distribution<double> box_uniform(-L, L);
    std:: vector <coordinate> positions(n_part);
    for (int i=0; i<n_part; ++i){
        positions[i].x = box_uniform(twister);
        positions[i].y = box_uniform(twister);
    }

    // Set forces.
    std:: vector <coordinate> forces(n_part);
    compute_force(forces, positions, L, kappa, sigma_2);
    
    // Set velocities.
    std:: vector <coordinate> velocities(n_part);

    // Main loop.
    const double stepsize_half = 0.5 * stepsize;
    std:: normal_distribution<> normal{0,1};
    
    auto t1 = std:: chrono::high_resolution_clock::now();
    for (int i=0; i<=N_iter; ++i){
        
        B_step(positions, velocities, forces, stepsize_half, L, kappa, sigma_2);
        A_step(positions, velocities, stepsize_half, L);
        apply_periodic_boundaries(positions, L);
        O_step(velocities, stepsize, gamma, beta, twister);
        A_step(positions, velocities, stepsize_half, L);
        apply_periodic_boundaries(positions, L);
        compute_force(forces, positions, L, kappa, sigma_2);
        B_step(positions, velocities, forces, stepsize_half, L, kappa, sigma_2);

        if (i%1000==0) std::cout << "Iteration "<<i<< " done!\n";

    }
    
    auto t2 = std:: chrono:: high_resolution_clock:: now();
    auto ms_int = std:: chrono:: duration_cast < std:: chrono:: seconds > (t2 - t1);
    std:: cout << "Execution took " << ms_int.count() << " seconds!\n";

    return 0;

}
Another difference is the usage of macros. In this version 0, I use constant variables (beginning of main) that are passed to the functions, whereas the others use macros.

The function calls won't noticeably affect your results.

Alas, I know nothing about molecular simulations, so, I cannot offer much insight into this toy integrator.

If you are interested in working with large systems it really would be worth your time to look at the stuff available online
https://www.google.com/search?q=molecular+integrator+c/c++
Duthomhas wrote:
The function calls won't noticeably affect your results.


You are right, it's the macros!!

If I use V0 (i.e. functions), but replace the constant variables from the beginning with the macros (like in V1 and 2), I get runtime 85s.

It makes sense, I guess. Parameters like L or sigma are used many times. If they are stored in memory, accessing them will cost compared to inserting them during compilation via macros.
Pages: 123