ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/OpenMD/branches/development/src/parallel/Parallel.cpp
Revision: 1665
Committed: Tue Nov 22 20:38:56 2011 UTC (13 years, 5 months ago) by gezelter
File size: 26973 byte(s)
Log Message:
updated copyright notices

File Contents

# Content
1 /**
2 * @file Parallel.cpp
3 * @author Charles Vardeman <cvardema.at.nd.edu>
4 * @date 08/18/2010
5 * @time 11:56am
6 * @version 1.0
7 *
8 * @section LICENSE
9 * Copyright (C) 2010 The University of Notre Dame. All Rights Reserved.
10 *
11 *
12 * The University of Notre Dame grants you ("Licensee") a
13 * non-exclusive, royalty free, license to use, modify and
14 * redistribute this software in source and binary code form, provided
15 * that the following conditions are met:
16 *
17 * 1. Redistributions of source code must retain the above copyright
18 * notice, this list of conditions and the following disclaimer.
19 *
20 * 2. Redistributions in binary form must reproduce the above copyright
21 * notice, this list of conditions and the following disclaimer in the
22 * documentation and/or other materials provided with the
23 * distribution.
24 *
25 * This software is provided "AS IS," without a warranty of any
26 * kind. All express or implied conditions, representations and
27 * warranties, including any implied warranty of merchantability,
28 * fitness for a particular purpose or non-infringement, are hereby
29 * excluded. The University of Notre Dame and its licensors shall not
30 * be liable for any damages suffered by licensee as a result of
31 * using, modifying or distributing the software or its
32 * derivatives. In no event will the University of Notre Dame or its
33 * licensors be liable for any lost revenue, profit or data, or for
34 * direct, indirect, special, consequential, incidental or punitive
35 * damages, however caused and regardless of the theory of liability,
36 * arising out of the use of or inability to use software, even if the
37 * University of Notre Dame has been advised of the possibility of
38 * such damages.
39 *
40 * SUPPORT OPEN SCIENCE! If you use OpenMD or its source code in your
41 * research, please cite the appropriate papers when you publish your
42 * work. Good starting points are:
43 *
44 * [1] Meineke, et al., J. Comp. Chem. 26, 252-271 (2005).
45 * [2] Fennell & Gezelter, J. Chem. Phys. 124, 234104 (2006).
46 * [3] Sun, Lin & Gezelter, J. Chem. Phys. 128, 24107 (2008).
47 * [4] Vardeman, Stocker & Gezelter, J. Chem. Theory Comput. 7, 834 (2011).
48 */
49
50
51 #include <stdlib.h>
52 #include "config.h"
53
54 #ifdef IS_MPI
55 #include <mpi.h>
56 #endif
57
58 #include <iostream>
59 #include <vector>
60 #include <algorithm>
61
62 #include "parallel/Parallel.hpp"
63
64
65 using namespace std;
66 using namespace OpenMD;
67
68 //#define DEBUG_PARALLEL
69
70
71 #ifdef SINGLE_PRECISION
72 #define MY_MPI_REAL MPI_FLOAT
73 #else
74 #define MY_MPI_REAL MPI_DOUBLE
75 #endif
76
77
78
79
80 //____ mpiAbort
81 static void mpiAbort();
82
83 void mpiAbort() {
84 if (Parallel::ok()) {
85 #ifdef IS_MPI
86 MPI::Comm_world.Abort(1);
87 #endif
88 }
89 exit(EXIT_FAILURE);
90 }
91
92 //____ mpiExit
93 static void mpiExit();
94
95 void mpiExit() {
96 if (Parallel::ok())
97 Parallel::finalize();
98 exit(EXIT_SUCCESS);
99 }
100
101
102 //____ MPITypeTraits
103 template<typename T>
104 struct MPITypeTraits;
105
106 #ifdef IS_MPI
107 template<>
108 struct MPITypeTraits<RealType> {
109 static const MPI::Datatype datatype;
110 };
111 const MPI_Datatype MPITypeTraits<RealType>::datatype = MY_MPI_REAL;
112
113 template<>
114 struct MPITypeTraits<int> {
115 static const MPI::Datatype datatype;
116 };
117 const MPI::Datatype MPITypeTraits<int>::datatype = MPI_INT;
118
119
120 //____ allReduceScalar
121 template<typename T>
122 void allReduceScalar(T &begin) {
123 T tmp = begin;
124 /*
125 MPI_Allreduce(&tmp, &begin, 1, MPITypeTraits<T>::datatype, MPI_SUM,
126 (exludeMaster ? slaveComm : MPI_COMM_WORLD));
127 */
128 MPI::Comm_world.Allreduce(&tmp,&begin,1, MPITypeTraits<T>::datatype, MPI::SUM);
129 }
130
131 //____ allReduce
132 template<typename T>
133 void allReduce(T *begin, T *end) {
134 vector<T> tmp(end - begin);
135 copy(begin, end, tmp.begin());
136 /* MPI_Allreduce(&(tmp[0]), begin, (end - begin), MPITypeTraits<T>::datatype,
137 MPI_SUM,
138 (exludeMaster ? slaveComm : MPI_COMM_WORLD));
139 */
140 MPI::Comm_world.Allreduce(&tmp[0],&begin,1, MPITypeTraits<T>::datatype, MPI::SUM);
141 }
142
143 template<bool exludeMaster, bool dobarrier>
144 void allReduce(Vector3DBlock *coords) {
145 allReduce<exludeMaster, dobarrier>(&(coords->begin()->x),
146 &(coords->end()->x));
147 }
148
149 template<bool exludeMaster, bool dobarrier>
150 void allReduce(ScalarStructure *energies) {
151 allReduce<exludeMaster, dobarrier>(
152 &((*energies)[ScalarStructure::FIRST]),
153 &((*energies)[ScalarStructure::LASTREDUCE]));
154 }
155
156 //____ broadcastScalar
157 template<bool exludeMaster, bool dobarrier, typename T>
158 void broadcastScalar(T &begin) {
159 if (dobarrier)
160 doBarrier<exludeMaster>();
161 MPI_Bcast(&begin, 1, MPITypeTraits<T>::datatype, master,
162 (exludeMaster ? slaveComm : MPI_COMM_WORLD));
163 }
164
165 //____ broadcast
166
167 template<bool exludeMaster, bool dobarrier, typename T>
168 void broadcast(T *begin, T *end) {
169 if (dobarrier)
170 doBarrier<exludeMaster>();
171 MPI_Bcast(begin, (end - begin), MPITypeTraits<T>::datatype,
172 (exludeMaster ? 0 : master),
173 (exludeMaster ? slaveComm : MPI_COMM_WORLD));
174 }
175
176 template<bool exludeMaster, bool dobarrier>
177 void broadcast(Vector3DBlock *coords) {
178 broadcast<exludeMaster, dobarrier>(&(coords->begin()->x),
179 &(coords->end()->x));
180 }
181
182 template<bool exludeMaster, bool dobarrier>
183 void broadcast(ScalarStructure *energies) {
184 broadcast<exludeMaster, dobarrier>(
185 &((*energies)[ScalarStructure::FIRST]),
186 &((*energies)[ScalarStructure::LASTREDUCE]));
187 }
188
189 #endif
190
191 //____ Parallel
192
193 #ifdef IS_MPI
194 const bool Parallel::isMPI = true;
195 #else
196 const bool Parallel::isMPI = false;
197 #endif
198
199 bool Parallel::myInitialized = false;
200 bool Parallel::myFinalized = false;
201 int Parallel::myId = 0;
202 int Parallel::myMasterId = master;
203 int Parallel::myNum = 1;
204 int Parallel::myAvailableId = 0;
205 int Parallel::myAvailableNum = 1;
206 bool Parallel::myIsParallel = Parallel::isMPI;
207 bool Parallel::myIAmMaster = true;
208 bool Parallel::myIAmSlave = true;
209 ParallelType Parallel::myMode = ParallelType::STATIC;
210 Parallel::WorkState Parallel::myWorkState = Parallel::SEQUENTIAL;
211
212 int Parallel::myPipeSize = 1;
213 bool Parallel::myUseBarrier = false;
214 int Parallel::myMaxPackages = -1;
215
216 int *Parallel::myBuffer = NULL;
217 int Parallel::myNext = 0;
218 int Parallel::myNextRange[2] = {
219 0, 0
220 };
221 vector<int> Parallel::myDone;
222 vector<int> Parallel::myBlockList;
223
224 int Parallel::myRecv;
225 int Parallel::myI;
226 int Parallel::myP;
227
228 Parallel * Parallel::obj = NULL;
229
230 int Parallel::myOldId = 0;
231 int Parallel::myOldNum = 1;
232 ParallelType Parallel::myOldMode = ParallelType::STATIC;
233 bool Parallel::myIsolated = false;
234
235 //____ Parallel
236
237 Parallel::~Parallel() {
238 }
239
240 Parallel::Parallel() {
241 myInitialized = false;
242 myFinalized = false;
243 myId = 0;
244 myMasterId = master;
245 myNum = 1;
246 myAvailableId = 0;
247 myAvailableNum = 1;
248 myIsParallel = Parallel::isMPI;
249 myIAmMaster = true;
250 myIAmSlave = true;
251 myMode = ParallelType::DYNAMIC;
252
253 myPipeSize = 1;
254 myUseBarrier = false;
255 myMaxPackages = -1;
256
257 myBuffer = NULL;
258 myNext = 0;
259 myNextRange[0] = 0;
260 myNextRange[1] = 0;
261 myDone.clear();
262 myBlockList.clear();
263
264 myOldId = 0;
265 myOldNum = 1;
266 myOldMode = ParallelType::STATIC;
267 myIsolated = false;
268 }
269
270 Parallel &Parallel::instance() {
271 // We have to do it ourself ... M$ problem ...
272 if (obj == NULL) {
273 obj = new Parallel();
274 atexit(kill);
275 }
276 return *obj;
277 }
278
279 void Parallel::kill() {
280 Parallel *p = obj;
281 obj = NULL;
282 p->~Parallel();
283 }
284
285 void Parallel::init(int &argc, char ** &argv) {
286 instance();
287 if (!myInitialized && !myFinalized) {
288 #ifdef IS_MPI
289 MPI_Init(&argc, &argv);
290 MPI_Comm_size(MPI_COMM_WORLD, &myNum);
291 MPI_Comm_rank(MPI_COMM_WORLD, &myId);
292 MPI_Barrier(MPI_COMM_WORLD);
293 setOpenMDStartSerial(mpiStartSerial);
294 setOpenMDEndSerial(mpiEndSerial);
295 #endif
296 setOpenMDAbort(mpiAbort);
297 setOpenMDExit(mpiExit);
298 myInitialized = true;
299 myIsParallel = (myNum > 1);
300 myIAmMaster = (myId == myMasterId);
301 report.setIAmMaster(iAmMaster());
302 setMode(isParallel() ? ParallelType::DYNAMIC : ParallelType::STATIC);
303 setPipeSize(myPipeSize);
304 TimerStatistic::setParallel(myIsParallel);
305 if (iAmMaster() && isParallel())
306 myDone.resize(myNum);
307
308 report << plain << (isMPI ? "Using MPI." : "No MPI compilation.") << endr;
309 } else
310 if (iAmMaster())
311 report << recoverable << "MPI is" <<
312 ((myInitialized) ? " " : " not ") << "initialized and is" <<
313 ((myFinalized) ? " " : " not ") <<
314 "finalized. Called [Parallel::init].\n" << endr;
315 }
316
317 void Parallel::finalize() {
318 if (ok("Called [Parallel::finalize].")) {
319 setOpenMDAbort(NULL);
320 setOpenMDExit(NULL);
321 setOpendMDStartSerial(NULL);
322 setOpenMDEndSerial(NULL);
323 #ifdef IS_MPI
324 if (iAmSlave())
325 MPI_Comm_free(&slaveComm);
326
327 if (myBuffer != NULL) {
328 int size;
329 MPI_Buffer_detach(myBuffer, &size);
330 delete[] myBuffer;
331 myBuffer = NULL;
332 }
333 FFTComplex::FFTComplexMPIFinalize();
334 MPI_Finalize();
335 #endif
336 myFinalized = true;
337 }
338 }
339
340 bool Parallel::ok(const string &err) {
341 if (ok())
342 return true;
343
344 if (iAmMaster())
345 report << recoverable << "MPI is" << ((myInitialized) ? " " : " not ") <<
346 "initialized and is" << ((myFinalized) ? " " : " not ") <<
347 "finalized. " << err << "\n" << endr;
348 return false;
349 }
350
351 void Parallel::setMode(ParallelType mode) {
352 if (!ok("Called [Parallel::setMasterSlave]"))
353 return;
354 #ifndef IS_MPI
355 mode = ParallelType::STATIC;
356 #endif
357 if (!isParallel() || mode == ParallelType::UNDEFINED)
358 mode = ParallelType::STATIC;
359
360 myMode = mode;
361 myAvailableNum = (mode == ParallelType::MASTERSLAVE ? myNum - 1 : myNum);
362 myIAmSlave = (mode == ParallelType::MASTERSLAVE ? (!myIAmMaster) :
363 true);
364 myAvailableId =
365 (myNum == myAvailableNum ? myId :
366 (myId == myMasterId ? -1 : (myId < myMasterId ? myId : myId - 1)));
367 #ifdef IS_MPI
368 if (slaveComm != MPI_COMM_NULL)
369 MPI_Comm_free(&slaveComm);
370
371 // Create intracommunicator only with slaves
372 MPI_Group worldGroup = MPI_GROUP_NULL;
373 MPI_Group slaveGroup = MPI_GROUP_NULL;
374 int excl[] = {
375 myMasterId
376 };
377
378 MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
379 MPI_Group_excl(worldGroup, myNum - myAvailableNum, excl, &slaveGroup);
380 MPI_Comm_create(MPI_COMM_WORLD, slaveGroup, &slaveComm);
381 MPI_Group_free(&worldGroup);
382 MPI_Group_free(&slaveGroup);
383 if (slaveComm != MPI_COMM_NULL)
384 MPI_Comm_rank(slaveComm, &myAvailableId);
385 FFTComplex::FFTComplexMPIInit(
386 mode == ParallelType::MASTERSLAVE ? myMasterId : -1);
387 #endif
388 #ifdef DEBUG_PARALLEL
389 report << allnodesserial << "ParallelMode (" << getId << ") : "
390 << getMode().getString() << endr;
391 #endif
392 }
393
394 void Parallel::setPipeSize(int n) {
395 if (!ok("Called [Parallel::setPipeSize]"))
396 return;
397 myPipeSize = max(n, 0);
398
399 #ifdef IS_MPI
400 if (myBuffer != NULL) {
401 int size;
402 MPI_Buffer_detach(myBuffer, &size);
403 delete[] myBuffer;
404 myBuffer = NULL;
405 }
406
407 // Allocate buffer for Bsend()
408 const int size = 3 * 2 * myNum * (myPipeSize + 1) + MPI_BSEND_OVERHEAD + 10;
409 myBuffer = new int[size];
410 for (int i = 0; i < size; i++)
411 myBuffer[i] = 0;
412
413 MPI_Buffer_attach(myBuffer, static_cast<int>(size * sizeof(int)));
414 #endif
415 }
416
417 void Parallel::setMaxPackages(int n) {
418 if (!ok("Called [Parallel::setMaxPackages]"))
419 return;
420 myMaxPackages = n < 0 ? (isDynamic() ? 3 : 0) :
421 n;
422 }
423
424 Parallel::WorkState Parallel::getWorkState() {
425 if (!isParallel())
426 return SEQUENTIAL;
427 else if (myMode == ParallelType::STATIC)
428 return STATIC;
429 else if (iAmMaster() && myMode == ParallelType::DYNAMIC)
430 return MASTER;
431 else
432 return SLAVE;
433 }
434
435 void Parallel::sync() {
436 #ifdef IS_MPI
437 MPI_Barrier(MPI_COMM_WORLD);
438 #endif
439 }
440
441 void Parallel::syncSlave() {
442 #ifdef IS_MPI
443 if (!iAmSlave() || !isParallel())
444 return;
445 MPI_Barrier(slaveComm);
446 #endif
447 }
448
449 // Senda a vector 3DBlock over MPI as an array.
450 #ifdef IS_MPI
451 void Parallel::send(Vector3DBlock *vect, int address) {
452 /* Create a C-style array large enough to hold all the real values
453 * (3 / Vector3D)
454 */
455 int size = vect->size();
456 Real *vectArray = new Real[3 * size];
457 if (vectArray == 0) {
458 cout << "Can't create Parallel::send() array! Quitting!" << endl;
459 MPI_Abort(MPI_COMM_WORLD, 1);
460 }
461
462 /* Start dumping the Vector3D into the array. This keeps us from having to
463 * create an MPI struct, pack it, * and in general, mess with all that.
464 * It's inefficient, but it's at least a start. It's designed such that
465 * the array looks like {x1 y1 z1 x2 y2 z2 ... xN yN zN} for N Vector3D's */
466 for (int i = 0; i < size; i++) {
467 vectArray[3 * i] = (*vect)[i][0];
468 vectArray[3 * i + 1] = (*vect)[i][1];
469 vectArray[3 * i + 2] = (*vect)[i][2];
470 }
471
472 /* Since it's an array of Reals, we can use the plain old Parallel::send
473 * routine since MPI can handle
474 * both single values and arrays with the same function call */
475 send(vectArray, 3 * size, address);
476 delete[] vectArray;
477 }
478
479 #else
480 void Parallel::send(Vector3DBlock *, int) {}
481
482 #endif
483
484 // Same philosophy as Parallel::send, except we're receiving an array
485 #ifdef IS_MPI
486 void Parallel::recv(Vector3DBlock *vect, int address) {
487 /* Create an array of proper size. */
488 int size = vect->size();
489 Real *vectArray = new Real[3 * size];
490 if (vectArray == 0) {
491 cout << "Can't create Parallel::recv() array! Quitting!" << endl;
492 MPI_Abort(MPI_COMM_WORLD, 1);
493 }
494 /* Since it's an array, the Parallel::recv() call is sufficient (don't you
495 * just love function overloading?) */
496 recv(vectArray, 3 * size, address);
497 /* Map the vector back onto an actual Vector3DBlock. The array looks like
498 * this: {x1 y1 z1 x2 y2 z2 ... xN yN zN} */
499 for (int i = 0; i < size; i++) {
500 (*vect)[i][0] = vectArray[3 * i];
501 (*vect)[i][1] = vectArray[3 * i + 1];
502 (*vect)[i][2] = vectArray[3 * i + 2];
503 }
504
505 delete[] vectArray;
506 }
507
508 #else
509 void Parallel::recv(Vector3DBlock *, int) {}
510
511 #endif
512
513 // Overwrites the Vector3D with a new one after sending it to another node
514 #ifdef IS_MPI
515 void Parallel::sendrecv_replace(Vector3DBlock *vect, int sendaddr,
516 int recvaddr) {
517 int size = vect->size();
518 Real *vectArray = new Real[3 * size];
519 if (vectArray == 0) {
520 cout << "Can't create Parallel::send() array! Quitting!" << endl;
521 MPI_Abort(MPI_COMM_WORLD, 1);
522 }
523 // Familiar mapping strategy...
524 for (int i = 0; i < size; i++) {
525 vectArray[3 * i] = (*vect)[i][0];
526 vectArray[3 * i + 1] = (*vect)[i][1];
527 vectArray[3 * i + 2] = (*vect)[i][2];
528 }
529
530 // Reuse of function calls to actually handle the MPI calls
531 sendrecv_replace(vectArray, 3 * size, sendaddr, recvaddr);
532 // Map it back in to the Vector3D and nobody will ever know we mucked with
533 // it... 8-)
534 for (int i = 0; i < size; i++) {
535 (*vect)[i][0] = vectArray[3 * i];
536 (*vect)[i][1] = vectArray[3 * i + 1];
537 (*vect)[i][2] = vectArray[3 * i + 2];
538 }
539
540 delete[] vectArray;
541 }
542
543 #else
544 void Parallel::sendrecv_replace(Vector3DBlock *, int, int) {}
545
546 #endif
547
548 #ifdef IS_MPI
549 void Parallel::send(Real *data, int num, int address) {
550 // Just a nice wrapper that automatically selects the MPI datatype for you
551 // and handles all the annoying things
552 MPI_Send(data, num, MPITypeTraits<Real>::datatype, address, 0,
553 MPI_COMM_WORLD);
554 }
555
556 #else
557 void Parallel::send(Real *, int, int) {}
558
559 #endif
560
561 #ifdef IS_MPI
562 void Parallel::recv(Real *data, int num, int address) {
563 // Another MPI wrapper function....
564 MPI_Status status;
565 MPI_Recv(data, num, MPITypeTraits<Real>::datatype, address, 0,
566 MPI_COMM_WORLD,
567 &status);
568 }
569
570 #else
571 void Parallel::recv(Real *, int, int) {}
572
573 #endif
574
575 #ifdef IS_MPI
576 void Parallel::sendrecv(Real *senddata, int sendnum, int sendaddr,
577 Real *recvdata, int recvnum,
578 int recvaddr) {
579 MPI_Status status;
580 MPI_Sendrecv(senddata, sendnum, MPITypeTraits<Real>::datatype, sendaddr, 0,
581 recvdata, recvnum, MPITypeTraits<Real>::datatype, recvaddr, 0,
582 MPI_COMM_WORLD,
583 &status);
584 }
585
586 #else
587 void Parallel::sendrecv(Real *, int, int, Real *, int, int) {}
588
589 #endif
590
591 #ifdef IS_MPI
592 void Parallel::sendrecv_replace(Real *data, int num, int sendaddr,
593 int recvaddr) {
594 MPI_Status status;
595 MPI_Sendrecv_replace(data, num, MPITypeTraits<Real>::datatype, sendaddr, 0,
596 recvaddr, 0, MPI_COMM_WORLD,
597 &status);
598 }
599
600 #else
601 void Parallel::sendrecv_replace(Real *, int, int, int) {}
602
603 #endif
604
605 #ifdef IS_MPI
606 void Parallel::gather(Real *data, int num, Real *data_array, int address) {
607 MPI_Gather(data, num, MPITypeTraits<Real>::datatype, data_array, num,
608 MPITypeTraits<Real>::datatype, address,
609 MPI_COMM_WORLD);
610 }
611
612 #else
613 void Parallel::gather(Real *, int, Real *, int) {}
614
615 #endif
616
617 #ifdef IS_MPI
618 void Parallel::allgather(Real *data, int num, Real *data_array) {
619 MPI_Allgather(data, num, MPITypeTraits<Real>::datatype, data_array, num,
620 MPITypeTraits<Real>::datatype,
621 MPI_COMM_WORLD);
622 }
623
624 #else
625 void Parallel::allgather(Real *, int, Real *) {}
626
627 #endif
628
629 #ifdef IS_MPI
630 void Parallel::reduceSlaves(Real *begin, Real *end) {
631 if (!iAmSlave() || !isParallel())
632 return;
633 TimerStatistic::timer[TimerStatistic::COMMUNICATION].start();
634
635 allReduce<true, true>(begin, end);
636
637 TimerStatistic::timer[TimerStatistic::COMMUNICATION].stop();
638 }
639
640 #else
641 void Parallel::reduceSlaves(Real *, Real *) {}
642
643 #endif
644
645 #ifdef IS_MPI
646 void Parallel::distribute(ScalarStructure *energies, Vector3DBlock *coords) {
647 if (!iAmMaster() && !energies->distributed()) {
648 energies->clear();
649 coords->zero();
650 }-
651 energies->distribute();
652 coords->distribute();
653 }
654
655 #else
656 void Parallel::distribute(ScalarStructure *, Vector3DBlock *) {}
657
658 #endif
659
660 #ifdef IS_MPI
661 void Parallel::reduce(ScalarStructure *energies, Vector3DBlock *coords) {
662 energies->reduce();
663 coords->reduce();
664 if (!isParallel() || energies->distributed())
665 return;
666 TimerStatistic::timer[TimerStatistic::COMMUNICATION].start();
667
668 allReduce<false, true>(coords);
669 allReduce<false, false>(energies);
670
671 TimerStatistic::timer[TimerStatistic::COMMUNICATION].stop();
672 }
673
674 #else
675 void Parallel::reduce(ScalarStructure *, Vector3DBlock *) {}
676
677 #endif
678
679 #ifdef IS_MPI
680 void Parallel::bcast(Vector3DBlock *coords) {
681 if (!isParallel())
682 return;
683 broadcast<false, true>(coords);
684 }
685
686 #else
687 void Parallel::bcast(Vector3DBlock *) {}
688
689 #endif
690
691 #ifdef IS_MPI
692 void Parallel::bcast(int &n) {
693 if (!isParallel())
694 return;
695 broadcastScalar<false, true>(n);
696 }
697
698 #else
699 void Parallel::bcast(int &) {}
700
701 #endif
702
703 #ifdef IS_MPI
704 void Parallel::bcastSlaves(Real *begin, Real *end) {
705 if (!iAmSlave() || !isParallel())
706 return;
707 TimerStatistic::timer[TimerStatistic::COMMUNICATION].start();
708
709 broadcast<true, true>(begin, end);
710
711 TimerStatistic::timer[TimerStatistic::COMMUNICATION].stop();
712 }
713
714 #else
715 void Parallel::bcastSlaves(Real *, Real *) {}
716
717 #endif
718
719 unsigned int Parallel::getNumberOfPackages(unsigned int n) {
720 if (getMaxPackages() < 1 ||
721 static_cast<unsigned int>(getAvailableNum()) >= n)
722 return n;
723 return min(n / getAvailableNum(),
724 static_cast<unsigned int>(getMaxPackages()))
725 * static_cast<unsigned int>(getAvailableNum());
726 }
727
728 void Parallel::resetNext(const vector<int> &blocks) {
729 resetNext();
730 if (!iAmMaster())
731 return;
732 #ifdef IS_MPI
733 if (blocks.empty())
734 return;
735
736 // Vector of ranges (n0,n1,n3, ... ,nM, -1, -1, ..., -1)
737 // n0,n1,n3, ... ,nM are the numbers a slave will call next()
738 // for one given force
739 // A slave will call next() and check if the
740 // actual number of next()-calls is inside the
741 // range received by the master and compute if true.
742 int n = 0;
743 for (unsigned int i = 0; i < blocks.size(); i++)
744 n += blocks[i];
745
746 if (n < 1)
747 return;
748
749 myBlockList.resize(n + 1 + myAvailableNum + 1);
750 for (unsigned int i = 0; static_cast<int>(i) <= n; i++)
751 myBlockList[i] = i;
752
753 // Adding ranges (-1,-1) to indicate that there is no more
754 // to compute.
755 for (unsigned int i = n + 1; i < myBlockList.size(); i++)
756 myBlockList[i] = -1;
757
758 #ifdef DEBUG_PARALLEL
759 report << allnodes << plain << "Blocks :";
760 for (unsigned int i = 0; i < myBlockList.size(); i++)
761 report << myBlockList[i] << " ";
762
763 report << endr;
764 #endif
765 // Fill up the pipe ...
766 myI = 0; // Actual range index
767 myRecv = 1; // Number of pending recieves
768 // One more since one slave will send a request but not call
769 // recv for the last range
770 int stop = 0; //
771 myP = 0;
772 myDone[myId] = 0;
773 for (int j = 0; j < (myPipeSize + 1) * myNum; j++) {
774 if (stop == myNum - 1)
775 break;
776
777 int myP = j % myNum;
778 if (myP == myMasterId)
779 continue;
780
781 MPI_Bsend(&(myBlockList[myI]), 2, MPI_INT, myP, SEND_RANGE,
782 MPI_COMM_WORLD);
783 #ifdef DEBUG_PARALLEL
784 report << allnodes << plain << "Block[" << myI << "] = [" <<
785 myBlockList[myI] << "," <<
786 myBlockList[myI + 1] << "] to " << myP << "." << endr;
787 #endif
788 myDone[myP] = (myBlockList[myI + 2] < 0 ? 1 : 0);
789 if (myDone[myP] != 0)
790 stop++;
791 if (myDone[myP] == 0)
792 myRecv++;
793 myI++;
794 }
795
796 if (myMode == ParallelType::MASTERSLAVE)
797 nextMaster();
798 #endif
799 }
800
801 void Parallel::nextMaster() {
802 #ifdef IS_MPI
803 #ifdef DEBUG_PARALLEL
804 report << allnodes << plain << "Parallel::nextMaster Recv " << myRecv <<
805 "." << endr;
806 #endif
807 while (myRecv > 0) {
808 int test = 0;
809 MPI_Status status;
810 char tmp[1];
811
812 // Receiving the request for a new range from a slave.
813 // We wait until we got a msg. Note that waiting for ANY_SOURCE
814 // could lead to a starving of some nodes.
815 while (!test) {
816 myP = (1 + myP) % myNum;
817 if (myP != myMasterId)
818 MPI_Iprobe(myP, NEED_RANGE, MPI_COMM_WORLD, &test, &status);
819 }
820
821 MPI_Recv(tmp, 0, MPI_CHAR, myP, NEED_RANGE, MPI_COMM_WORLD, &status);
822 myRecv--;
823 #ifdef DEBUG_PARALLEL
824 report << allnodes << plain << "Recieve from " << myP << "." << endr;
825 report << allnodes << plain << "Recv " << myRecv << "." << endr;
826 #endif
827
828 // We skip to send a further range if the node got the last
829 // range since the that node will terminate without
830 // receiving any messages.
831 if (myDone[myP] == 0) {
832 MPI_Bsend(&(myBlockList[myI]), 2, MPI_INT, myP, SEND_RANGE,
833 MPI_COMM_WORLD);
834 #ifdef DEBUG_PARALLEL
835 report << allnodes << plain << "Block[" << myI << "] = [" <<
836 myBlockList[myI] << "," <<
837 myBlockList[myI + 1] << "] to " << myP << "." << endr;
838 #endif
839 myDone[myP] = (myBlockList[myI + 2] < 0 ? 1 : 0);
840 if (myDone[myP] == 0)
841 myRecv++;
842 myI++;
843 }
844 }
845 #endif
846
847 }
848
849 bool Parallel::next() {
850 bool doNext = true;
851
852 #ifdef DEBUG_PARALLEL
853 int oldNext = myNext;
854 #endif
855 switch (myWorkState) {
856 #ifdef IS_MPI
857 case MASTER:
858
859 do
860 {
861
862 } while (condition);Next = false;
863 do {
864 for (int k = 0; k < myNum; ++k) {
865 myP = (1 + myP) % myNum;
866 if (myP == myMasterId)
867 continue;
868
869 int test = 0;
870 MPI_Status status;
871 char tmp[1];
872
873 MPI_Iprobe(myP, NEED_RANGE, MPI_COMM_WORLD, &test, &status);
874
875 if (test) {
876 MPI_Recv(tmp, 0, MPI_CHAR, myP, NEED_RANGE, MPI_COMM_WORLD, &status);
877 myRecv--;
878 #ifdef DEBUG_PARALLEL
879 report << allnodes << plain << "Recieve from " << myP << "." <<
880 endr;
881 #endif
882
883 // We skip to send a further range if the node got the last
884 // range since the that node will terminate without
885 // receiving any messages.
886 if (myDone[myP] == 0) {
887 MPI_Bsend(&(myBlockList[myI]), 2, MPI_INT, myP, SEND_RANGE,
888 MPI_COMM_WORLD);
889 #if defined (DEBUG_PARALLEL)
890 report << allnodes << plain << "Block[" << myI << "] = [" <<
891 myBlockList[myI] << "," <<
892 myBlockList[myI + 1] << "] to " << myP << "." << endr;
893 #endif
894 myDone[myP] = (myBlockList[myI + 2] < 0 ? 1 : 0);
895 if (myDone[myP] == 0)
896 myRecv++;
897 myI++;
898 }
899 }
900 }
901 } while (myBlockList[myI + 2] < 0 && myBlockList[myI + 0] >= 0 &&
902 myBlockList[myI + 1] >= 0);
903
904 if (myNextRange[1] == myNext) {
905 myNextRange[0] = myBlockList[myI];
906 myNextRange[1] = myBlockList[myI + 1];
907 myDone[myId] = (myBlockList[myI + 2] < 0 ? 1 : 0);
908 #if defined (DEBUG_PARALLEL)
909 report << allnodes << plain << "Block[" << myI << "] = [" <<
910 myBlockList[myI] << "," <<
911 myBlockList[myI + 1] << "] to " << getId() << "." << endr;
912 #endif
913 myI++;
914 }
915 doNext = !(myNext < myNextRange[0] || myNextRange[1] < 0);
916 myNext++;
917
918 #ifdef DEBUG_PARALLEL
919 report << allnodes << plain << "Recv " << myRecv << "." << endr;
920 #endif
921 if (myDone[myId] != 0)
922 nextMaster();
923
924 break;
925
926 case Parallel::SLAVE:
927 if (myNextRange[1] == myNext) {
928 // We need a new range from the master
929 char tmp[1];
930 MPI_Status status;
931 TimerStatistic::timer[TimerStatistic::IDLE].start();
932 #ifdef DEBUG_PARALLEL
933 report << allnodes << plain << "Recv new block " << getId() << "." <<
934 endr;
935 #endif
936 MPI_Recv(myNextRange, 2, MPI_INT, myMasterId, SEND_RANGE,
937 MPI_COMM_WORLD,
938 &status);
939 #ifdef DEBUG_PARALLEL
940 report << allnodes << plain << "Recv new block " << getId() << " [" <<
941 myNextRange[0] << "," << myNextRange[1] << "]." << endr;
942 #endif
943 // Asking for the next range such we have it when we need it next time.
944 TimerStatistic::timer[TimerStatistic::IDLE].stop();
945 if (myNextRange[1] >= 0) {
946 MPI_Bsend(tmp, 0, MPI_CHAR, myMasterId, NEED_RANGE, MPI_COMM_WORLD);
947 #ifdef DEBUG_PARALLEL
948 report << allnodes << plain << "Ask new block " << getId() << "." <<
949 endr;
950 #endif
951 }
952 if (myNextRange[1] < 0)
953 myWorkState = DONE;
954 }
955
956 doNext = !(myNext < myNextRange[0] || myNextRange[1] < 0);
957 myNext++;
958
959 break;
960 #endif
961
962 case DONE:
963 doNext = false;
964 break;
965
966 case STATIC:
967 doNext = (myNext == myId);
968 myNext = (myNext + 1) % myNum;
969 break;
970
971 case SEQUENTIAL:
972 default:
973 break;
974 }
975
976 #ifdef DEBUG_PARALLEL
977 report << allnodes << plain << "Next (" << getId() << ") : "
978 << (bool)doNext << ", " << myWorkState << ", " << oldNext << ", ["
979 << myNextRange[0] << "," << myNextRange[1] << "]" << endr;
980 #endif
981 return doNext;
982 }
983
984 void Parallel::isolateNode() {
985 if (myIsolated == false) {
986 myOldId = myId;
987 myOldNum = myNum;
988 myOldMode = myMode;
989 myIsolated = true;
990
991 myId = 0;
992 myNum = 1;
993 myIsParallel = false;
994 myIAmMaster = (myId == myMasterId);
995 }
996 }
997
998 void Parallel::integrateNode() {
999 if (myIsolated == true) {
1000 myId = myOldId;
1001 myNum = myOldNum;
1002 myIsolated = false;
1003
1004 myIsParallel = (myNum > 1);
1005 myIAmMaster = (myId == myMasterId);
1006 }
1007 }
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017