matrices.cc
Go to the documentation of this file.
1 // LIC// ====================================================================
2 // LIC// This file forms part of oomph-lib, the object-oriented,
3 // LIC// multi-physics finite-element library, available
4 // LIC// at http://www.oomph-lib.org.
5 // LIC//
6 // LIC// Copyright (C) 2006-2024 Matthias Heil and Andrew Hazel
7 // LIC//
8 // LIC// This library is free software; you can redistribute it and/or
9 // LIC// modify it under the terms of the GNU Lesser General Public
10 // LIC// License as published by the Free Software Foundation; either
11 // LIC// version 2.1 of the License, or (at your option) any later version.
12 // LIC//
13 // LIC// This library is distributed in the hope that it will be useful,
14 // LIC// but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // LIC// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // LIC// Lesser General Public License for more details.
17 // LIC//
18 // LIC// You should have received a copy of the GNU Lesser General Public
19 // LIC// License along with this library; if not, write to the Free Software
20 // LIC// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 // LIC// 02110-1301 USA.
22 // LIC//
23 // LIC// The authors may be contacted at oomph-lib@maths.man.ac.uk.
24 // LIC//
25 // LIC//====================================================================
26 // Non-inline member functions for the matrix classes
27 
28 #ifdef OOMPH_HAS_MPI
29 #include "mpi.h"
30 #endif
31 
32 #include <cstring>
33 
34 #include <set>
35 #include <map>
36 
37 //#include <valgrind/callgrind.h>
38 
39 // oomph-lib headers
40 #include "matrices.h"
41 #include "linear_solver.h"
42 
43 
44 namespace oomph
45 {
46  //============================================================================
47  /// Complete LU solve (overwrites RHS with solution). This is the
48  /// generic version which should not need to be over-written.
49  //============================================================================
51  {
52 #ifdef PARANOID
53  if (Linear_solver_pt == 0)
54  {
55  throw OomphLibError("Linear_solver_pt not set in matrix",
56  OOMPH_CURRENT_FUNCTION,
57  OOMPH_EXCEPTION_LOCATION);
58  }
59 #endif
60 
61  // Copy rhs vector into local storage so it doesn't get overwritten
62  // if the linear solver decides to initialise the solution vector, say,
63  // which it's quite entitled to do!
64  DoubleVector actual_rhs(rhs);
65 
66  // Use the linear algebra interface to the linear solver
67  Linear_solver_pt->solve(this, actual_rhs, rhs);
68  }
69 
70  //============================================================================
71  /// Complete LU solve (Nothing gets overwritten!). This generic
72  /// version should never need to be overwritten
73  //============================================================================
75  {
76 #ifdef PARANOID
77  if (Linear_solver_pt == 0)
78  {
79  throw OomphLibError("Linear_solver_pt not set in matrix",
80  OOMPH_CURRENT_FUNCTION,
81  OOMPH_EXCEPTION_LOCATION);
82  }
83 #endif
84  // Use the linear algebra interface to the linear solver
85  Linear_solver_pt->solve(this, rhs, soln);
86  }
87 
88  //============================================================================
89  /// Complete LU solve (overwrites RHS with solution). This is the
90  /// generic version which should not need to be over-written.
91  //============================================================================
93  {
94 #ifdef PARANOID
95  if (Linear_solver_pt == 0)
96  {
97  throw OomphLibError("Linear_solver_pt not set in matrix",
98  OOMPH_CURRENT_FUNCTION,
99  OOMPH_EXCEPTION_LOCATION);
100  }
101 #endif
102 
103  // Copy rhs vector into local storage so it doesn't get overwritten
104  // if the linear solver decides to initialise the solution vector, say,
105  // which it's quite entitled to do!
106  Vector<double> actual_rhs(rhs);
107 
108  // Use the linear algebra interface to the linear solver
109  Linear_solver_pt->solve(this, actual_rhs, rhs);
110  }
111 
112  //============================================================================
113  /// Complete LU solve (Nothing gets overwritten!). This generic
114  /// version should never need to be overwritten
115  //============================================================================
117  {
118 #ifdef PARANOID
119  if (Linear_solver_pt == 0)
120  {
121  throw OomphLibError("Linear_solver_pt not set in matrix",
122  OOMPH_CURRENT_FUNCTION,
123  OOMPH_EXCEPTION_LOCATION);
124  }
125 #endif
126  // Use the linear algebra interface to the linear solver
127  Linear_solver_pt->solve(this, rhs, soln);
128  }
129 
130 
131  /// /////////////////////////////////////////////////////////////////////
132  /// /////////////////////////////////////////////////////////////////////
133  /// /////////////////////////////////////////////////////////////////////
134 
135  //===============================================================
136  /// Constructor, set the default linear solver to be the DenseLU
137  /// solver
138  //===============================================================
140  {
142  }
143 
144  //==============================================================
145  /// Constructor to build a square n by n matrix.
146  /// Set the default linear solver to be DenseLU
147  //==============================================================
148  DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long& n)
149  : DenseMatrix<double>(n)
150  {
152  }
153 
154 
155  //=================================================================
156  /// Constructor to build a matrix with n rows and m columns.
157  /// Set the default linear solver to be DenseLU
158  //=================================================================
159  DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long& n,
160  const unsigned long& m)
161  : DenseMatrix<double>(n, m)
162  {
164  }
165 
166  //=====================================================================
167  /// Constructor to build a matrix with n rows and m columns,
168  /// with initial value initial_val
169  /// Set the default linear solver to be DenseLU
170  //=====================================================================
171  DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long& n,
172  const unsigned long& m,
173  const double& initial_val)
174  : DenseMatrix<double>(n, m, initial_val)
175  {
177  }
178 
179  //=======================================================================
180  /// Destructor delete the default linear solver
181  //======================================================================
183  {
184  // Delete the default linear solver
186  }
187 
188  //============================================================================
189  /// LU decompose a matrix, by using the default linear solver
190  /// (DenseLU)
191  //============================================================================
193  {
194  // Use the default (DenseLU) solver to ludecompose the matrix
195  static_cast<DenseLU*>(Default_linear_solver_pt)->factorise(this);
196  }
197 
198 
199  //============================================================================
200  /// Back substitute an LU decomposed matrix.
201  //============================================================================
203  {
204  // Use the default (DenseLU) solver to perform the backsubstitution
205  static_cast<DenseLU*>(Default_linear_solver_pt)->backsub(rhs, rhs);
206  }
207 
208  //============================================================================
209  /// Back substitute an LU decomposed matrix.
210  //============================================================================
212  {
213  // Use the default (DenseLU) solver to perform the backsubstitution
214  static_cast<DenseLU*>(Default_linear_solver_pt)->backsub(rhs, rhs);
215  }
216 
217 
218  //============================================================================
219  /// Determine eigenvalues and eigenvectors, using
220  /// Jacobi rotations. Only for symmetric matrices. Nothing gets overwritten!
221  /// - \c eigen_vect(i,j) = j-th component of i-th eigenvector.
222  /// - \c eigen_val[i] is the i-th eigenvalue; same ordering as in eigenvectors
223  //============================================================================
225  Vector<double>& eigen_vals, DenseMatrix<double>& eigen_vect) const
226  {
227 #ifdef PARANOID
228  // Check Matrix is square
229  if (N != M)
230  {
231  throw OomphLibError(
232  "This matrix is not square, the matrix MUST be square!",
233  OOMPH_CURRENT_FUNCTION,
234  OOMPH_EXCEPTION_LOCATION);
235  }
236 #endif
237  // Make a copy of the matrix & check that it's symmetric
238 
239  // Check that the sizes of eigen_vals and eigen_vect are correct. If not
240  // correct them.
241  if (eigen_vals.size() != N)
242  {
243  eigen_vals.resize(N);
244  }
245  if (eigen_vect.ncol() != N || eigen_vect.nrow() != N)
246  {
247  eigen_vect.resize(N);
248  }
249 
250  DenseDoubleMatrix working_matrix(N);
251  for (unsigned long i = 0; i < N; i++)
252  {
253  for (unsigned long j = 0; j < M; j++)
254  {
255 #ifdef PARANOID
256  if (Matrixdata[M * i + j] != Matrixdata[M * j + i])
257  {
258  throw OomphLibError(
259  "Matrix needs to be symmetric for eigenvalues_by_jacobi()",
260  OOMPH_CURRENT_FUNCTION,
261  OOMPH_EXCEPTION_LOCATION);
262  }
263 #endif
264  working_matrix(i, j) = (*this)(i, j);
265  }
266  }
267 
268  DenseDoubleMatrix aux_eigen_vect(N);
269 
270  throw OomphLibError("Sorry JacobiEigenSolver::jacobi() removed because of "
271  "licencing problems.",
272  OOMPH_CURRENT_FUNCTION,
273  OOMPH_EXCEPTION_LOCATION);
274 
275  // // Call eigensolver
276  // unsigned long nrot;
277  // JacobiEigenSolver::jacobi(working_matrix, eigen_vals, aux_eigen_vect,
278  // nrot);
279 
280  // Copy across (and transpose)
281  for (unsigned long i = 0; i < N; i++)
282  {
283  for (unsigned long j = 0; j < M; j++)
284  {
285  eigen_vect(i, j) = aux_eigen_vect(j, i);
286  }
287  }
288  }
289 
290 
291  //============================================================================
292  /// Multiply the matrix by the vector x: soln=Ax
293  //============================================================================
295  DoubleVector& soln) const
296  {
297 #ifdef PARANOID
298  // Check to see if x.size() = ncol().
299  if (x.nrow() != this->ncol())
300  {
301  std::ostringstream error_message_stream;
302  error_message_stream << "The x vector is not the right size. It is "
303  << x.nrow() << ", it should be " << this->ncol()
304  << std::endl;
305  throw OomphLibError(error_message_stream.str(),
306  OOMPH_CURRENT_FUNCTION,
307  OOMPH_EXCEPTION_LOCATION);
308  }
309  // check that x is not distributed
310  if (x.distributed())
311  {
312  std::ostringstream error_message_stream;
313  error_message_stream
314  << "The x vector cannot be distributed for DenseDoubleMatrix "
315  << "matrix-vector multiply" << std::endl;
316  throw OomphLibError(error_message_stream.str(),
317  OOMPH_CURRENT_FUNCTION,
318  OOMPH_EXCEPTION_LOCATION);
319  }
320  // if soln is setup...
321  if (soln.built())
322  {
323  // check that soln is not distributed
324  if (soln.distributed())
325  {
326  std::ostringstream error_message_stream;
327  error_message_stream
328  << "The x vector cannot be distributed for DenseDoubleMatrix "
329  << "matrix-vector multiply" << std::endl;
330  throw OomphLibError(error_message_stream.str(),
331  OOMPH_CURRENT_FUNCTION,
332  OOMPH_EXCEPTION_LOCATION);
333  }
334  if (soln.nrow() != this->nrow())
335  {
336  std::ostringstream error_message_stream;
337  error_message_stream
338  << "The soln vector is setup and therefore must have the same "
339  << "number of rows as the matrix";
340  throw OomphLibError(error_message_stream.str(),
341  OOMPH_CURRENT_FUNCTION,
342  OOMPH_EXCEPTION_LOCATION);
343  }
344  if (*x.distribution_pt()->communicator_pt() !=
345  *soln.distribution_pt()->communicator_pt())
346  {
347  std::ostringstream error_message_stream;
348  error_message_stream
349  << "The soln vector and the x vector must have the same communicator"
350  << std::endl;
351  throw OomphLibError(error_message_stream.str(),
352  OOMPH_CURRENT_FUNCTION,
353  OOMPH_EXCEPTION_LOCATION);
354  }
355  }
356 #endif
357 
358  // if soln is not setup then setup the distribution
359  if (!soln.built())
360  {
362  x.distribution_pt()->communicator_pt(), this->nrow(), false);
363  soln.build(&dist, 0.0);
364  }
365 
366  // Initialise the solution
367  soln.initialise(0.0);
368 
369  // Multiply the matrix A, by the vector x
370  const double* x_pt = x.values_pt();
371  double* soln_pt = soln.values_pt();
372  for (unsigned long i = 0; i < N; i++)
373  {
374  for (unsigned long j = 0; j < M; j++)
375  {
376  soln_pt[i] += Matrixdata[M * i + j] * x_pt[j];
377  }
378  }
379  }
380 
381 
382  //=================================================================
383  /// Multiply the transposed matrix by the vector x: soln=A^T x
384  //=================================================================
386  DoubleVector& soln) const
387  {
388 #ifdef PARANOID
389  // Check to see if x.size() = ncol().
390  if (x.nrow() != this->nrow())
391  {
392  std::ostringstream error_message_stream;
393  error_message_stream << "The x vector is not the right size. It is "
394  << x.nrow() << ", it should be " << this->nrow()
395  << std::endl;
396  throw OomphLibError(error_message_stream.str(),
397  OOMPH_CURRENT_FUNCTION,
398  OOMPH_EXCEPTION_LOCATION);
399  }
400  // check that x is not distributed
401  if (x.distributed())
402  {
403  std::ostringstream error_message_stream;
404  error_message_stream
405  << "The x vector cannot be distributed for DenseDoubleMatrix "
406  << "matrix-vector multiply" << std::endl;
407  throw OomphLibError(error_message_stream.str(),
408  OOMPH_CURRENT_FUNCTION,
409  OOMPH_EXCEPTION_LOCATION);
410  }
411  // if soln is setup...
412  if (soln.built())
413  {
414  // check that soln is not distributed
415  if (soln.distributed())
416  {
417  std::ostringstream error_message_stream;
418  error_message_stream
419  << "The x vector cannot be distributed for DenseDoubleMatrix "
420  << "matrix-vector multiply" << std::endl;
421  throw OomphLibError(error_message_stream.str(),
422  OOMPH_CURRENT_FUNCTION,
423  OOMPH_EXCEPTION_LOCATION);
424  }
425  if (soln.nrow() != this->ncol())
426  {
427  std::ostringstream error_message_stream;
428  error_message_stream
429  << "The soln vector is setup and therefore must have the same "
430  << "number of columns as the matrix";
431  throw OomphLibError(error_message_stream.str(),
432  OOMPH_CURRENT_FUNCTION,
433  OOMPH_EXCEPTION_LOCATION);
434  }
435  if (*soln.distribution_pt()->communicator_pt() !=
437  {
438  std::ostringstream error_message_stream;
439  error_message_stream
440  << "The soln vector and the x vector must have the same communicator"
441  << std::endl;
442  throw OomphLibError(error_message_stream.str(),
443  OOMPH_CURRENT_FUNCTION,
444  OOMPH_EXCEPTION_LOCATION);
445  }
446  }
447 #endif
448 
449  // if soln is not setup then setup the distribution
450  if (!soln.built())
451  {
453  x.distribution_pt()->communicator_pt(), this->ncol(), false);
454  soln.build(dist_pt, 0.0);
455  delete dist_pt;
456  }
457 
458  // Initialise the solution
459  soln.initialise(0.0);
460 
461  // Matrix vector product
462  double* soln_pt = soln.values_pt();
463  const double* x_pt = x.values_pt();
464  for (unsigned long i = 0; i < N; i++)
465  {
466  for (unsigned long j = 0; j < M; j++)
467  {
468  soln_pt[j] += Matrixdata[N * i + j] * x_pt[i];
469  }
470  }
471  }
472 
473 
474  //=================================================================
475  /// For every row, find the maximum absolute value of the
476  /// entries in this row. Set all values that are less than alpha times
477  /// this maximum to zero and return the resulting matrix in
478  /// reduced_matrix. Note: Diagonal entries are retained regardless
479  /// of their size.
480  //=================================================================
481  void DenseDoubleMatrix::matrix_reduction(const double& alpha,
482  DenseDoubleMatrix& reduced_matrix)
483  {
484  reduced_matrix.resize(N, M, 0.0);
485  // maximum value in a row
486  double max_row;
487 
488  // Loop over rows
489  for (unsigned i = 0; i < N; i++)
490  {
491  // Initialise max value in row
492  max_row = 0.0;
493 
494  // Loop over entries in columns
495  for (unsigned long j = 0; j < M; j++)
496  {
497  // Find max. value in row
498  if (std::fabs(Matrixdata[M * i + j]) > max_row)
499  {
500  max_row = std::fabs(Matrixdata[M * i + j]);
501  }
502  }
503 
504  // Decide if we need to retain the entries in the row
505  for (unsigned long j = 0; j < M; j++)
506  {
507  // If we're on the diagonal or the value is sufficiently large: retain
508  // i.e. copy across.
509  if (i == j || std::fabs(Matrixdata[M * i + j]) > alpha * max_row)
510  {
511  reduced_matrix(i, j) = Matrixdata[M * i + j];
512  }
513  }
514  }
515  }
516 
517 
518  //=============================================================================
519  /// Function to multiply this matrix by the DenseDoubleMatrix matrix_in.
520  //=============================================================================
522  DenseDoubleMatrix& result)
523  {
524 #ifdef PARANOID
525  // check matrix dimensions are compatable
526  if (this->ncol() != matrix_in.nrow())
527  {
528  std::ostringstream error_message;
529  error_message
530  << "Matrix dimensions incompatable for matrix-matrix multiplication"
531  << "ncol() for first matrix:" << this->ncol()
532  << "nrow() for second matrix: " << matrix_in.nrow();
533 
534  throw OomphLibError(
535  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
536  }
537 #endif
538 
539  // NB N is number of rows!
540  unsigned long n_row = this->nrow();
541  unsigned long m_col = matrix_in.ncol();
542 
543  // resize and intialize result
544  result.resize(n_row, m_col, 0.0);
545 
546  // clock_t clock1 = clock();
547 
548  // do calculation
549  unsigned long n_col = this->ncol();
550  for (unsigned long k = 0; k < n_col; k++)
551  {
552  for (unsigned long i = 0; i < n_row; i++)
553  {
554  for (unsigned long j = 0; j < m_col; j++)
555  {
556  result(i, j) += Matrixdata[m_col * i + k] * matrix_in(k, j);
557  }
558  }
559  }
560  }
561 
562 
563  /// ////////////////////////////////////////////////////////////////////////////
564  /// ////////////////////////////////////////////////////////////////////////////
565  /// ////////////////////////////////////////////////////////////////////////////
566 
567 
568  //=======================================================================
569  /// Default constructor, set the default linear solver and
570  /// matrix-matrix multiplication method.
571  //========================================================================
573  {
576  }
577 
578  //========================================================================
579  /// Constructor: Pass vector of values, vector of row indices,
580  /// vector of column starts and number of rows (can be suppressed
581  /// for square matrices). Number of nonzero entries is read
582  /// off from value, so make sure the vector has been shrunk
583  /// to its correct length.
584  //=======================================================================
586  const Vector<int>& row_index,
587  const Vector<int>& column_start,
588  const unsigned long& n,
589  const unsigned long& m)
590  : CCMatrix<double>(value, row_index, column_start, n, m)
591  {
594  }
595 
596  /// Destructor: delete the default linear solver
598  {
600  }
601 
602 
603  //===================================================================
604  /// Perform LU decomposition. Return the sign of the determinant
605  //===================================================================
607  {
608  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->factorise(this);
609  }
610 
611  //===================================================================
612  /// Do the backsubstitution
613  //===================================================================
615  {
616  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->backsub(rhs, rhs);
617  }
618 
619  //===================================================================
620  /// Multiply the matrix by the vector x
621  //===================================================================
623  {
624 #ifdef PARANOID
625  // Check to see if x.size() = ncol().
626  if (x.nrow() != this->ncol())
627  {
628  std::ostringstream error_message_stream;
629  error_message_stream << "The x vector is not the right size. It is "
630  << x.nrow() << ", it should be " << this->ncol()
631  << std::endl;
632  throw OomphLibError(error_message_stream.str(),
633  OOMPH_CURRENT_FUNCTION,
634  OOMPH_EXCEPTION_LOCATION);
635  }
636  // check that x is not distributed
637  if (x.distributed())
638  {
639  std::ostringstream error_message_stream;
640  error_message_stream
641  << "The x vector cannot be distributed for CCDoubleMatrix "
642  << "matrix-vector multiply" << std::endl;
643  throw OomphLibError(error_message_stream.str(),
644  OOMPH_CURRENT_FUNCTION,
645  OOMPH_EXCEPTION_LOCATION);
646  }
647  // if soln is setup...
648  if (soln.built())
649  {
650  // check that soln is not distributed
651  if (soln.distributed())
652  {
653  std::ostringstream error_message_stream;
654  error_message_stream
655  << "The x vector cannot be distributed for CCDoubleMatrix "
656  << "matrix-vector multiply" << std::endl;
657  throw OomphLibError(error_message_stream.str(),
658  OOMPH_CURRENT_FUNCTION,
659  OOMPH_EXCEPTION_LOCATION);
660  }
661  if (soln.nrow() != this->nrow())
662  {
663  std::ostringstream error_message_stream;
664  error_message_stream
665  << "The soln vector is setup and therefore must have the same "
666  << "number of rows as the matrix";
667  throw OomphLibError(error_message_stream.str(),
668  OOMPH_CURRENT_FUNCTION,
669  OOMPH_EXCEPTION_LOCATION);
670  }
671  if (*soln.distribution_pt()->communicator_pt() !=
673  {
674  std::ostringstream error_message_stream;
675  error_message_stream
676  << "The soln vector and the x vector must have the same communicator"
677  << std::endl;
678  throw OomphLibError(error_message_stream.str(),
679  OOMPH_CURRENT_FUNCTION,
680  OOMPH_EXCEPTION_LOCATION);
681  }
682  }
683 #endif
684 
685  // if soln is not setup then setup the distribution
686  if (!soln.built())
687  {
689  x.distribution_pt()->communicator_pt(), this->nrow(), false);
690  soln.build(dist_pt, 0.0);
691  delete dist_pt;
692  }
693 
694  // zero
695  soln.initialise(0.0);
696 
697  // multiply
698  double* soln_pt = soln.values_pt();
699  const double* x_pt = x.values_pt();
700  for (unsigned long j = 0; j < N; j++)
701  {
702  for (long k = Column_start[j]; k < Column_start[j + 1]; k++)
703  {
704  unsigned long i = Row_index[k];
705  double a_ij = Value[k];
706  soln_pt[i] += a_ij * x_pt[j];
707  }
708  }
709  }
710 
711 
712  //=================================================================
713  /// Multiply the transposed matrix by the vector x: soln=A^T x
714  //=================================================================
716  DoubleVector& soln) const
717  {
718 #ifdef PARANOID
719  // Check to see if x.size() = ncol().
720  if (x.nrow() != this->nrow())
721  {
722  std::ostringstream error_message_stream;
723  error_message_stream << "The x vector is not the right size. It is "
724  << x.nrow() << ", it should be " << this->nrow()
725  << std::endl;
726  throw OomphLibError(error_message_stream.str(),
727  OOMPH_CURRENT_FUNCTION,
728  OOMPH_EXCEPTION_LOCATION);
729  }
730  // check that x is not distributed
731  if (x.distributed())
732  {
733  std::ostringstream error_message_stream;
734  error_message_stream
735  << "The x vector cannot be distributed for CCDoubleMatrix "
736  << "matrix-vector multiply" << std::endl;
737  throw OomphLibError(error_message_stream.str(),
738  OOMPH_CURRENT_FUNCTION,
739  OOMPH_EXCEPTION_LOCATION);
740  }
741  // if soln is setup...
742  if (soln.built())
743  {
744  // check that soln is not distributed
745  if (soln.distributed())
746  {
747  std::ostringstream error_message_stream;
748  error_message_stream
749  << "The x vector cannot be distributed for CCDoubleMatrix "
750  << "matrix-vector multiply" << std::endl;
751  throw OomphLibError(error_message_stream.str(),
752  OOMPH_CURRENT_FUNCTION,
753  OOMPH_EXCEPTION_LOCATION);
754  }
755  if (soln.nrow() != this->ncol())
756  {
757  std::ostringstream error_message_stream;
758  error_message_stream
759  << "The soln vector is setup and therefore must have the same "
760  << "number of columns as the matrix";
761  throw OomphLibError(error_message_stream.str(),
762  OOMPH_CURRENT_FUNCTION,
763  OOMPH_EXCEPTION_LOCATION);
764  }
765  if (*soln.distribution_pt()->communicator_pt() !=
767  {
768  std::ostringstream error_message_stream;
769  error_message_stream
770  << "The soln vector and the x vector must have the same communicator"
771  << std::endl;
772  throw OomphLibError(error_message_stream.str(),
773  OOMPH_CURRENT_FUNCTION,
774  OOMPH_EXCEPTION_LOCATION);
775  }
776  }
777 #endif
778 
779  // if soln is not setup then setup the distribution
780  if (!soln.built())
781  {
783  x.distribution_pt()->communicator_pt(), this->ncol(), false);
784  soln.build(dist_pt, 0.0);
785  delete dist_pt;
786  }
787 
788  // zero
789  soln.initialise(0.0);
790 
791  // Matrix vector product
792  double* soln_pt = soln.values_pt();
793  const double* x_pt = x.values_pt();
794  for (unsigned long i = 0; i < N; i++)
795  {
796  for (long k = Column_start[i]; k < Column_start[i + 1]; k++)
797  {
798  unsigned long j = Row_index[k];
799  double a_ij = Value[k];
800  soln_pt[j] += a_ij * x_pt[i];
801  }
802  }
803  }
804 
805 
806  //===========================================================================
807  /// Function to multiply this matrix by the CCDoubleMatrix matrix_in
808  /// The multiplication method used can be selected using the flag
809  /// Matrix_matrix_multiply_method. By default Method 2 is used.
810  /// Method 1: First runs through this matrix and matrix_in to find the storage
811  /// requirements for result - arrays of the correct size are
812  /// then allocated before performing the calculation.
813  /// Minimises memory requirements but more costly.
814  /// Method 2: Grows storage for values and column indices of result 'on the
815  /// fly' using an array of maps. Faster but more memory
816  /// intensive.
817  /// Method 3: Grows storage for values and column indices of result 'on the
818  /// fly' using a vector of vectors. Not particularly impressive
819  /// on the platforms we tried...
820  //=============================================================================
822  CCDoubleMatrix& result)
823  {
824 #ifdef PARANOID
825  // check matrix dimensions are compatible
826  if (this->ncol() != matrix_in.nrow())
827  {
828  std::ostringstream error_message;
829  error_message
830  << "Matrix dimensions incompatable for matrix-matrix multiplication"
831  << "ncol() for first matrix:" << this->ncol()
832  << "nrow() for second matrix: " << matrix_in.nrow();
833 
834  throw OomphLibError(
835  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
836  }
837 #endif
838 
839  // NB N is number of rows!
840  unsigned long N = this->nrow();
841  unsigned long M = matrix_in.ncol();
842  unsigned long Nnz = 0;
843 
844  // pointers to arrays which store result
845  int* Column_start;
846  double* Value;
847  int* Row_index;
848 
849  // get pointers to matrix_in
850  const int* matrix_in_col_start = matrix_in.column_start();
851  const int* matrix_in_row_index = matrix_in.row_index();
852  const double* matrix_in_value = matrix_in.value();
853 
854  // get pointers to this matrix
855  const double* this_value = this->value();
856  const int* this_col_start = this->column_start();
857  const int* this_row_index = this->row_index();
858 
859  // set method
860  unsigned method = Matrix_matrix_multiply_method;
861 
862  // clock_t clock1 = clock();
863 
864  // METHOD 1
865  // --------
866  if (method == 1)
867  {
868  // allocate storage for column starts
869  Column_start = new int[M + 1];
870  Column_start[0] = 0;
871 
872  // a set to store number of non-zero rows in each column of result
873  std::set<unsigned> rows;
874 
875  // run through columns of this matrix and matrix_in to find number of
876  // non-zero entries in each column of result
877  for (unsigned long this_col = 0; this_col < M; this_col++)
878  {
879  // run through non-zeros in this_col of this matrix
880  for (int this_ptr = this_col_start[this_col];
881  this_ptr < this_col_start[this_col + 1];
882  this_ptr++)
883  {
884  // find row index for non-zero
885  unsigned matrix_in_col = this_row_index[this_ptr];
886 
887  // run through corresponding column in matrix_in
888  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
889  matrix_in_ptr < matrix_in_col_start[matrix_in_col + 1];
890  matrix_in_ptr++)
891  {
892  // find row index for non-zero in matrix_in and store in rows
893  rows.insert(matrix_in_row_index[matrix_in_ptr]);
894  }
895  }
896  // update Column_start
897  Column_start[this_col + 1] = Column_start[this_col] + rows.size();
898 
899  // wipe values in rows
900  rows.clear();
901  }
902 
903  // set Nnz
904  Nnz = Column_start[M];
905 
906  // allocate arrays for result
907  Value = new double[Nnz];
908  Row_index = new int[Nnz];
909 
910  // set all values of Row_index to -1
911  for (unsigned long i = 0; i < Nnz; i++) Row_index[i] = -1;
912 
913  // Calculate values for result - first run through columns of this matrix
914  for (unsigned long this_col = 0; this_col < M; this_col++)
915  {
916  // run through non-zeros in this_column
917  for (int this_ptr = this_col_start[this_col];
918  this_ptr < this_col_start[this_col + 1];
919  this_ptr++)
920  {
921  // find value of non-zero
922  double this_val = this_value[this_ptr];
923 
924  // find row associated with non-zero
925  unsigned matrix_in_col = this_row_index[this_ptr];
926 
927  // run through corresponding column in matrix_in
928  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
929  matrix_in_ptr < matrix_in_col_start[matrix_in_col + 1];
930  matrix_in_ptr++)
931  {
932  // find row index for non-zero in matrix_in
933  int row = matrix_in_row_index[matrix_in_ptr];
934 
935  // find position in result to insert value
936  for (int ptr = Column_start[this_col];
937  ptr <= Column_start[this_col + 1];
938  ptr++)
939  {
940  if (ptr == Column_start[this_col + 1])
941  {
942  // error - have passed end of column without finding
943  // correct row index
944  std::ostringstream error_message;
945  error_message << "Error inserting value in result";
946 
947  throw OomphLibError(error_message.str(),
948  OOMPH_CURRENT_FUNCTION,
949  OOMPH_EXCEPTION_LOCATION);
950  }
951  else if (Row_index[ptr] == -1)
952  {
953  // first entry for this row index
954  Row_index[ptr] = row;
955  Value[ptr] = this_val * matrix_in_value[matrix_in_ptr];
956  break;
957  }
958  else if (Row_index[ptr] == row)
959  {
960  // row index already exists - add value
961  Value[ptr] += this_val * matrix_in_value[matrix_in_ptr];
962  break;
963  }
964  }
965  }
966  }
967  }
968  }
969 
970  // METHOD 2
971  // --------
972  else if (method == 2)
973  {
974  // generate array of maps to store values for result
975  std::map<int, double>* result_maps = new std::map<int, double>[M];
976 
977  // run through columns of this matrix
978  for (unsigned long this_col = 0; this_col < M; this_col++)
979  {
980  // run through non-zeros in this_col
981  for (int this_ptr = this_col_start[this_col];
982  this_ptr < this_col_start[this_col + 1];
983  this_ptr++)
984  {
985  // find value of non-zero
986  double this_val = this_value[this_ptr];
987 
988  // find row index associated with non-zero
989  unsigned matrix_in_col = this_row_index[this_ptr];
990 
991  // run through corresponding column in matrix_in
992  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
993  matrix_in_ptr < matrix_in_col_start[matrix_in_col + 1];
994  matrix_in_ptr++)
995  {
996  // find row index for non-zero in matrix_in
997  int row = matrix_in_row_index[matrix_in_ptr];
998 
999  // insert value
1000  result_maps[this_col][row] +=
1001  this_val * matrix_in_value[matrix_in_ptr];
1002  }
1003  }
1004  }
1005 
1006  // allocate Column_start
1007  Column_start = new int[M + 1];
1008 
1009  // copy across column starts
1010  Column_start[0] = 0;
1011  for (unsigned long col = 0; col < M; col++)
1012  {
1013  int size = result_maps[col].size();
1014  Column_start[col + 1] = Column_start[col] + size;
1015  }
1016 
1017  // set Nnz
1018  Nnz = Column_start[M];
1019 
1020  // allocate other arrays
1021  Value = new double[Nnz];
1022  Row_index = new int[Nnz];
1023 
1024  // copy values and row indices
1025  for (unsigned long col = 0; col < M; col++)
1026  {
1027  unsigned ptr = Column_start[col];
1028  for (std::map<int, double>::iterator i = result_maps[col].begin();
1029  i != result_maps[col].end();
1030  i++)
1031  {
1032  Row_index[ptr] = i->first;
1033  Value[ptr] = i->second;
1034  ptr++;
1035  }
1036  }
1037 
1038  // tidy up memory
1039  delete[] result_maps;
1040  }
1041 
1042  // METHOD 3
1043  // --------
1044  else if (method == 3)
1045  {
1046  // vectors of vectors to store results
1047  std::vector<std::vector<int>> result_rows(N);
1048  std::vector<std::vector<double>> result_vals(N);
1049 
1050  // run through the columns of this matrix
1051  for (unsigned long this_col = 0; this_col < M; this_col++)
1052  {
1053  // run through non-zeros in this_col
1054  for (int this_ptr = this_col_start[this_col];
1055  this_ptr < this_col_start[this_col + 1];
1056  this_ptr++)
1057  {
1058  // find value of non-zero
1059  double this_val = this_value[this_ptr];
1060 
1061  // find row index associated with non-zero
1062  unsigned matrix_in_col = this_row_index[this_ptr];
1063 
1064  // run through corresponding column in matrix_in
1065  for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
1066  matrix_in_ptr < matrix_in_col_start[matrix_in_col + 1];
1067  matrix_in_ptr++)
1068  {
1069  // find row index for non-zero in matrix_in
1070  int row = matrix_in_row_index[matrix_in_ptr];
1071 
1072  // insert value
1073  int size = result_rows[this_col].size();
1074  for (int i = 0; i <= size; i++)
1075  {
1076  if (i == size)
1077  {
1078  // first entry for this row index
1079  result_rows[this_col].push_back(row);
1080  result_vals[this_col].push_back(this_val *
1081  matrix_in_value[matrix_in_ptr]);
1082  }
1083  else if (row == result_rows[this_col][i])
1084  {
1085  // row index already exists
1086  result_vals[this_col][i] +=
1087  this_val * matrix_in_value[matrix_in_ptr];
1088  break;
1089  }
1090  }
1091  }
1092  }
1093  }
1094 
1095  // allocate Column_start
1096  Column_start = new int[M + 1];
1097 
1098  // copy across column starts
1099  Column_start[0] = 0;
1100  for (unsigned long col = 0; col < M; col++)
1101  {
1102  int size = result_rows[col].size();
1103  Column_start[col + 1] = Column_start[col] + size;
1104  }
1105 
1106  // set Nnz
1107  Nnz = Column_start[M];
1108 
1109  // allocate other arrays
1110  Value = new double[Nnz];
1111  Row_index = new int[Nnz];
1112 
1113  // copy across values and row indices
1114  for (unsigned long col = 0; col < N; col++)
1115  {
1116  unsigned ptr = Column_start[col];
1117  unsigned n_rows = result_rows[col].size();
1118  for (unsigned i = 0; i < n_rows; i++)
1119  {
1120  Row_index[ptr] = result_rows[col][i];
1121  Value[ptr] = result_vals[col][i];
1122  ptr++;
1123  }
1124  }
1125  }
1126 
1127  // INCORRECT VALUE FOR METHOD
1128  else
1129  {
1130  std::ostringstream error_message;
1131  error_message << "Incorrect method set in matrix-matrix multiply"
1132  << "method=" << method << " not allowed";
1133 
1134  throw OomphLibError(
1135  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
1136  }
1137 
1139  }
1140 
1141 
1142  //=================================================================
1143  /// For every row, find the maximum absolute value of the
1144  /// entries in this row. Set all values that are less than alpha times
1145  /// this maximum to zero and return the resulting matrix in
1146  /// reduced_matrix. Note: Diagonal entries are retained regardless
1147  /// of their size.
1148  //=================================================================
1149  void CCDoubleMatrix::matrix_reduction(const double& alpha,
1150  CCDoubleMatrix& reduced_matrix)
1151  {
1152  // number of columns in matrix
1153  long n_coln = ncol();
1154 
1155  Vector<double> max_row(nrow(), 0.0);
1156 
1157  // Here's the packed format for the new matrix
1158  Vector<int> B_row_start(1);
1159  Vector<int> B_column_index;
1160  Vector<double> B_value;
1161 
1162 
1163  // k is counter for the number of entries in the reduced matrix
1164  unsigned k = 0;
1165 
1166  // Initialise row start
1167  B_row_start[0] = 0;
1168 
1169  // Loop over columns
1170  for (long i = 0; i < n_coln; i++)
1171  {
1172  // Loop over entries in columns
1173  for (long j = Column_start[i]; j < Column_start[i + 1]; j++)
1174  {
1175  // Find max. value in row
1176  if (std::fabs(Value[j]) > max_row[Row_index[j]])
1177  {
1178  max_row[Row_index[j]] = std::fabs(Value[j]);
1179  }
1180  }
1181 
1182  // Decide if we need to retain the entries in the row
1183  for (long j = Column_start[i]; j < Column_start[i + 1]; j++)
1184  {
1185  // If we're on the diagonal or the value is sufficiently large: retain
1186  // i.e. copy across.
1187  if (i == Row_index[j] ||
1188  std::fabs(Value[j]) > alpha * max_row[Row_index[j]])
1189  {
1190  B_value.push_back(Value[j]);
1191  B_column_index.push_back(Row_index[j]);
1192  k++;
1193  }
1194  }
1195  // This writes the row start for the next row -- equal to
1196  // to the number of entries written so far (C++ zero-based indexing!)
1197  B_row_start.push_back(k);
1198  }
1199 
1200 
1201  // Build the matrix from the compressed format
1202  dynamic_cast<CCDoubleMatrix&>(reduced_matrix)
1203  .build(B_value, B_column_index, B_row_start, nrow(), ncol());
1204  }
1205 
1206 
1207  /// ////////////////////////////////////////////////////////////////////////////
1208  /// ////////////////////////////////////////////////////////////////////////////
1209  /// ////////////////////////////////////////////////////////////////////////////
1210 
1211  //=============================================================================
1212  /// Default constructor
1213  //=============================================================================
1215  {
1216  // set the default solver
1218 
1219  // matrix not built
1220  Built = false;
1221 
1222  // set the serial matrix-matrix multiply method
1223 #ifdef OOMPH_HAS_TRILINOS
1224  // Serial_matrix_matrix_multiply_method = 4;
1226 #else
1228 #endif
1229  }
1230 
1231  //=============================================================================
1232  /// Copy constructor
1233  //=============================================================================
1235  {
1236  // copy the distribution
1237  this->build_distribution(other_matrix.distribution_pt());
1238 
1239  // copy coefficients
1240  const double* values_pt = other_matrix.value();
1241  const int* column_indices = other_matrix.column_index();
1242  const int* row_start = other_matrix.row_start();
1243 
1244  // This is the local nnz.
1245  const unsigned nnz = other_matrix.nnz();
1246 
1247  // Using number of local rows since the underlying CRMatrix is local to
1248  // each processor.
1249  const unsigned nrow_local = other_matrix.nrow_local();
1250 
1251  // Storage for the (yet to be copied) data.
1252  double* my_values_pt = new double[nnz];
1253  int* my_column_indices = new int[nnz];
1254  int* my_row_start = new int[nrow_local + 1];
1255 
1256  // Copying over the data.
1257  std::copy(values_pt, values_pt + nnz, my_values_pt);
1258  std::copy(column_indices, column_indices + nnz, my_column_indices);
1259  std::copy(row_start, row_start + nrow_local + 1, my_row_start);
1260 
1261 
1262  // Build without copy since we have made a deep copy of the data structure.
1263  this->build_without_copy(
1264  other_matrix.ncol(), nnz, my_values_pt, my_column_indices, my_row_start);
1265 
1266  // set the default solver
1268 
1269  // matrix is built
1270  Built = true;
1271 
1272  // set the serial matrix-matrix multiply method
1273 #ifdef OOMPH_HAS_TRILINOS
1274  // Serial_matrix_matrix_multiply_method = 4;
1276 #else
1278 #endif
1279  }
1280 
1281 
1282  //=============================================================================
1283  /// Constructor: just stores the distribution but does not build the
1284  /// matrix
1285  //=============================================================================
1287  const LinearAlgebraDistribution* distribution_pt)
1288  {
1289  this->build_distribution(distribution_pt);
1290 
1291  // set the default solver
1293 
1294  // matrix not built
1295  Built = false;
1296 
1297 // set the serial matrix-matrix multiply method
1298 #ifdef OOMPH_HAS_TRILINOS
1299  // Serial_matrix_matrix_multiply_method = 4;
1301 #else
1303 #endif
1304  }
1305 
1306  //=============================================================================
1307  /// Constructor: Takes the distribution and the number of columns, as
1308  /// well as the vector of values, vector of column indices,vector of row
1309  /// starts.
1310  //=============================================================================
1312  const unsigned& ncol,
1313  const Vector<double>& value,
1314  const Vector<int>& column_index,
1315  const Vector<int>& row_start)
1316  {
1317  // build the compressed row matrix
1318  CR_matrix.build(
1319  value, column_index, row_start, dist_pt->nrow_local(), ncol);
1320 
1321  // store the Distribution
1322  this->build_distribution(dist_pt);
1323 
1324  // set the linear solver
1326 
1327  // set the serial matrix-matrix multiply method
1328 #ifdef OOMPH_HAS_TRILINOS
1329  // Serial_matrix_matrix_multiply_method = 4;
1331 #else
1333 #endif
1334 
1335  // matrix has been built
1336  Built = true;
1337  }
1338 
1339 
1340  //=============================================================================
1341  /// Destructor
1342  //=============================================================================
1344  {
1345  this->clear();
1346  delete Default_linear_solver_pt;
1348  }
1349 
1350  //=============================================================================
1351  /// Rebuild the matrix - assembles an empty matrix with a defined distribution
1352  //=============================================================================
1354  {
1355  this->clear();
1356  this->build_distribution(distribution_pt);
1357  }
1358 
1359  //=============================================================================
1360  /// Runs through the column index vector and checks if the entries
1361  /// are arranged arbitrarily or if they follow the regular lexicographical
1362  /// of matrices. If a boolean argument is provided with the assignment
1363  /// TRUE then information on the first entry which is not in the correct
1364  /// position will also be given
1365  //=============================================================================
1367  const bool& doc_unordered_entries) const
1368  {
1369 #ifdef OOMPH_HAS_MPI
1370  // We only need to produce a warning if the matrix is distributed
1371  if (this->distributed())
1372  {
1373  // Create an ostringstream object to store the warning message
1374  std::ostringstream warning_message;
1375 
1376  // Create the warning messsage
1377  warning_message << "This method currently works for serial but "
1378  << "has not been implemented for use with MPI!\n";
1379 
1380  // Issue the warning
1381  OomphLibWarning(warning_message.str(),
1382  OOMPH_CURRENT_FUNCTION,
1383  OOMPH_EXCEPTION_LOCATION);
1384  }
1385 #endif
1386 
1387  // Get the number of rows in this matrix
1388  unsigned n_rows = this->nrow();
1389 
1390  // Acquire access to the value, row_start and column_index arrays from
1391  // the CR matrix. Since we do not change anything in row_start_pt we
1392  // give it the const prefix
1393  const int* column_index_pt = this->column_index();
1394  const int* row_start_pt = this->row_start();
1395 
1396  // Loop over the rows of matrix
1397  for (unsigned i = 0; i < n_rows; i++)
1398  {
1399  // Calculate the number of nonzeros in the i-th row
1400  unsigned nnz_row_i = *(row_start_pt + i + 1) - *(row_start_pt + i);
1401 
1402  // Get the index of the first entry in row i
1403  unsigned i_row_start = *(row_start_pt + i);
1404 
1405  // Loop over the entries of the i-th row
1406  for (unsigned j = 0; j < nnz_row_i - 1; j++)
1407  {
1408  // Check if the column index of the following entry is greater than the
1409  // current entry
1410  if ((*(column_index_pt + i_row_start + j + 1)) <
1411  (*(column_index_pt + i_row_start + j)))
1412  {
1413  // If the input argument was set to TRUE we document we output
1414  // information of the first entry which is not in the correct position
1415  if (doc_unordered_entries)
1416  {
1417  // Tell the user
1418  oomph_info << "Matrix has not been correctly sorted!"
1419  << "\nOn row: " << i << "\nEntry: " << j
1420  << "\nEntry 1 = " << *(column_index_pt + i_row_start + j)
1421  << "\nEntry 2 = "
1422  << *(column_index_pt + i_row_start + j + 1) << std::endl;
1423  }
1424 
1425  // It hasn't worked
1426  return false;
1427  } // if ((*(column_index_pt+i_row_start+j+1)) ...
1428  } // for (unsigned j=0;j<nnz_row_i-1;j++)
1429  } // for (unsigned i=0;i<n_rows;i++)
1430 
1431  // If it gets here without a warning then the entries in each row of
1432  // the matrix are ordered by increasing column index
1433  return true;
1434  } // End of entries_are_sorted()
1435 
1436  //=============================================================================
1437  /// This helper function sorts the entries in the column index vector
1438  /// and the value vector. During the construction of the matrix the entries
1439  /// were most likely assigned in an arbitrary order. As a result, it cannot
1440  /// be assumed that the entries in the column index vector corresponding to
1441  /// each row of the matrix have been arranged in increasing order. During
1442  /// the setup an additional vector will be set up; Index_of_diagonal_entries.
1443  /// The i-th entry of this vector contains the index of the last entry
1444  /// below or on the diagonal. If there are no entries below or on the
1445  /// diagonal then the corresponding entry is -1. If, however, there are
1446  /// no entries in the row then the entry is irrelevant and is kept
1447  /// as the initialised value; 0.
1448  //=============================================================================
1450  {
1451 #ifdef OOMPH_HAS_MPI
1452  // We only need to produce a warning if the matrix is distributed
1453  if (this->distributed())
1454  {
1455  // Create an ostringstream object to store the warning message
1456  std::ostringstream warning_message;
1457 
1458  // Create the warning messsage
1459  warning_message << "This method currently works for serial but "
1460  << "has not been tested with MPI!\n";
1461 
1462  // Issue the warning
1463  OomphLibWarning(warning_message.str(),
1464  OOMPH_CURRENT_FUNCTION,
1465  OOMPH_EXCEPTION_LOCATION);
1466  }
1467 #endif
1468 
1469  // Get the number of rows in the matrix
1470  unsigned n_rows = this->nrow();
1471 
1472  // Acquire access to the value, row_start and column_index arrays from
1473  // the CR matrix. Since we do not change anything in row_start_pt we
1474  // give it the const prefix
1475  double* value_pt = this->value();
1476  int* column_index_pt = this->column_index();
1477  const int* row_start_pt = this->row_start();
1478 
1479  // Resize the Index_of_diagonal_entries vector
1480  Index_of_diagonal_entries.resize(n_rows, 0);
1481 
1482  // Vector of pairs to store the column_index of each value in the i-th row
1483  // and its corresponding matrix entry
1484  Vector<std::pair<int, double>> column_index_and_value_row_i;
1485 
1486  // Loop over the rows of the matrix
1487  for (unsigned i = 0; i < n_rows; i++)
1488  {
1489  // Find the number of nonzeros in the i-th row
1490  unsigned nnz_row_i = *(row_start_pt + i + 1) - *(row_start_pt + i);
1491 
1492  // Variable to store the start of the i-th row
1493  unsigned i_row_start = *(row_start_pt + i);
1494 
1495  // If there are no nonzeros in this row then the i-th entry of the vector
1496  // Index_of_diagonal_entries is irrelevant so we can simply let it be 0
1497  if (nnz_row_i == 0)
1498  {
1499  // Set the i-th entry
1501  }
1502  // If there are nonzeros in the i-th row
1503  else
1504  {
1505  // If there is more than one entry in the row resize the vector
1506  // column_index_and_value_row_i
1507  column_index_and_value_row_i.resize(nnz_row_i);
1508 
1509  // Loop over the entries in the row
1510  for (unsigned j = 0; j < nnz_row_i; j++)
1511  {
1512  // Assign the appropriate entries to column_index_and_value_row_i
1513  column_index_and_value_row_i[j] =
1514  std::make_pair(*(column_index_pt + i_row_start + j),
1515  *(value_pt + i_row_start + j));
1516  }
1517 
1518  // Sort the vector of pairs using the struct
1519  // CRDoubleMatrixComparisonHelper
1520  std::sort(column_index_and_value_row_i.begin(),
1521  column_index_and_value_row_i.end(),
1523 
1524  //-----------------------------------------------------------------------
1525  // Now that the entries of the i-th row have been sorted we can read
1526  // them back into value_pt and column_index_pt:
1527  //-----------------------------------------------------------------------
1528 
1529  // Create a boolean variable to indicate whether or not the i-th entry
1530  // of Index_of_diagonal_entries has been set
1531  bool is_ith_entry_set = false;
1532 
1533  // Loop over the entries in the vector column_index_and_value_row_i and
1534  // assign its entries to value_pt and column_index_pt
1535  for (unsigned j = 0; j < nnz_row_i; j++)
1536  {
1537  // Set the column index of the j-th nonzero value in the i-th row of
1538  // the matrix
1539  *(column_index_pt + i_row_start + j) =
1540  column_index_and_value_row_i[j].first;
1541 
1542  // Set the value of the j-th nonzero value in the i-th row of
1543  // the matrix
1544  *(value_pt + i_row_start + j) =
1545  column_index_and_value_row_i[j].second;
1546 
1547  // This if statement is used to set the i-th entry of the vector
1548  // Index_of_diagonal_entries if it has not yet been set
1549  if (!is_ith_entry_set)
1550  {
1551  // If the column index of the first entry in row i is greater than
1552  // the row number then the first entry must lie above the diagonal
1553  if (unsigned(*(column_index_pt + i_row_start)) > i)
1554  {
1555  // If the column index of the first entry in the row is greater
1556  // than the row number, i, then the i-th entry of
1557  // Index_of_diagonal_entries needs to be set to -1 to indicate
1558  // there are no entries below or on the diagonal
1560 
1561  // Indicate that the i-th entry of Index_of_diagonal_entries has
1562  // been set
1563  is_ith_entry_set = true;
1564  }
1565  // If there are entries below or on the diagonal
1566  else
1567  {
1568  // If there is only one entry in the row then we know that this
1569  // will be the last entry below or on the diagonal because we have
1570  // eliminated the possibility that if there is only one entry,
1571  // that it lies above the diagonal
1572  if (nnz_row_i == 1)
1573  {
1574  // Set the index of the current entry to be the value of i-th
1575  // entry of Index_of_diagonal_entries
1576  Index_of_diagonal_entries[i] = i_row_start + j;
1577 
1578  // Indicate that the i-th entry of Index_of_diagonal_entries has
1579  // been set
1580  is_ith_entry_set = true;
1581  }
1582  // It remains to now check the case that there is more than one
1583  // entry in the row. If there is more than one entry in the row
1584  // and there are entries below or on the diagonal then we have
1585  // three cases:
1586  // (1) The current entry lies on the diagonal;
1587  // (2) The current entry lies above the diagonal;
1588  // (3) The current entry lies below the diagonal;
1589  // The first case can easily be checked as done below. If the
1590  // second case occurs then we have just passed the last entry. We
1591  // know this because at least one entry lies on or below the
1592  // diagonal. If the second case it true then we need to assign the
1593  // previous entry to the vector Index_of_diagonal_entries.
1594  // Finally, we are left with case (3), which can be split into two
1595  // cases:
1596  // (3.1) The current entry lies below the diagonal but it
1597  // is not the last entry below or on the diagonal;
1598  // (3.2) The current entry lies below the diagonal and is
1599  // the last entry below or on the diagonal.
1600  // If case (3.1) holds then we can simply wait until we get to the
1601  // next entry in the row and examine that. If the next entry lies
1602  // on the diagonal then it will be swept up by case (1). If the
1603  // next entry lies above the diagonal then case (2) will sweep it
1604  // up and if neither is the case then we wait until the next entry
1605  // and so on. If, instead, case (3.2) holds then our last check
1606  // simply needs to check if the current entry is the last entry in
1607  // the row because if the last entry lies on the diagonal, case
1608  // (1) will sweep it up. If it lies above the diagonal, case (2)
1609  // will take care of it. Therefore, the only remaining case is
1610  // that it lies strictly below the diagonal and since it is the
1611  // last entry in the row it means the index of this entry needs to
1612  // be assigned to Index_of_diagonal_entries
1613 
1614  // Case (1) : The current entry lies on the diagonal
1615  else if (unsigned(*(column_index_pt + i_row_start + j)) == i)
1616  {
1617  // Set the index of the current entry to be the value of i-th
1618  // entry of Index_of_diagonal_entries
1619  Index_of_diagonal_entries[i] = i_row_start + j;
1620 
1621  // Indicate that the i-th entry of Index_of_diagonal_entries has
1622  // been set
1623  is_ith_entry_set = true;
1624  }
1625  // Case (2) : The current entry lies above the diagonal
1626  else if (unsigned(*(column_index_pt + i_row_start + j)) > i)
1627  {
1628  // Set the index of the current entry to be the value of i-th
1629  // entry of Index_of_diagonal_entries
1630  Index_of_diagonal_entries[i] = i_row_start + j - 1;
1631 
1632  // Indicate that the i-th entry of Index_of_diagonal_entries has
1633  // been set
1634  is_ith_entry_set = true;
1635  }
1636  // Case (3.2) : The current entry is the last entry in the row
1637  else if (j == nnz_row_i - 1)
1638  {
1639  // Set the index of the current entry to be the value of i-th
1640  // entry of Index_of_diagonal_entries
1641  Index_of_diagonal_entries[i] = i_row_start + j;
1642 
1643  // Indicate that the i-th entry of Index_of_diagonal_entries has
1644  // been set
1645  is_ith_entry_set = true;
1646  } // if (nnz_row_i==1) else if
1647  } // if (*(column_index_pt+i_row_start)>i)
1648  } // if (!is_ith_entry_set)
1649  } // for (unsigned j=0;j<nnz_row_i;j++)
1650  } // if (nnz_row_i==0) else
1651  } // for (unsigned i=0;i<n_rows;i++)
1652  } // End of sort_entries()
1653 
1654  //=============================================================================
1655  /// Clean method
1656  //=============================================================================
1658  {
1659  this->clear_distribution();
1661  Built = false;
1662 
1663  if (Linear_solver_pt != 0) // Only clean up if it exists
1665  }
1666 
1667  //=============================================================================
1668  /// build method: Takes the distribution and the number of columns, as
1669  /// well as the vector of values, vector of column indices,vector of row
1670  /// starts.
1671  //=============================================================================
1673  const unsigned& ncol,
1674  const Vector<double>& value,
1675  const Vector<int>& column_index,
1676  const Vector<int>& row_start)
1677  {
1678  // clear
1679  this->clear();
1680 
1681  // store the Distribution
1682  this->build_distribution(distribution_pt);
1683 
1684  // set the linear solver
1686 
1687  // now build the matrix
1688  this->build(ncol, value, column_index, row_start);
1689  }
1690 
1691  //=============================================================================
1692  /// method to rebuild the matrix, but not the distribution
1693  //=============================================================================
1694  void CRDoubleMatrix::build(const unsigned& ncol,
1695  const Vector<double>& value,
1696  const Vector<int>& column_index,
1697  const Vector<int>& row_start)
1698  {
1699  // call the underlying build method
1702 
1703  // matrix has been build
1704  Built = true;
1705  }
1706 
1707  //=============================================================================
1708  /// method to rebuild the matrix, but not the distribution
1709  //=============================================================================
1710  void CRDoubleMatrix::build_without_copy(const unsigned& ncol,
1711  const unsigned& nnz,
1712  double* value,
1713  int* column_index,
1714  int* row_start)
1715  {
1716  // call the underlying build method
1719  value, column_index, row_start, nnz, this->nrow_local(), ncol);
1720 
1721  // matrix has been build
1722  Built = true;
1723  }
1724 
1725  //=============================================================================
1726  /// Do LU decomposition
1727  //=============================================================================
1729  {
1730 #ifdef PARANOID
1731  // check that the this matrix is built
1732  if (!Built)
1733  {
1734  std::ostringstream error_message_stream;
1735  error_message_stream << "This matrix has not been built.";
1736  throw OomphLibError(error_message_stream.str(),
1737  OOMPH_CURRENT_FUNCTION,
1738  OOMPH_EXCEPTION_LOCATION);
1739  }
1740 #endif
1741 
1742  // factorise using superlu or superlu dist if we oomph has mpi
1743  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->factorise(this);
1744  }
1745 
1746  //=============================================================================
1747  /// Do back-substitution
1748  //=============================================================================
1750  {
1751 #ifdef PARANOID
1752  // check that the rhs vector is setup
1753  if (!rhs.built())
1754  {
1755  std::ostringstream error_message_stream;
1756  error_message_stream << "The vector rhs has not been setup";
1757  throw OomphLibError(error_message_stream.str(),
1758  OOMPH_CURRENT_FUNCTION,
1759  OOMPH_EXCEPTION_LOCATION);
1760  }
1761  // check that the rhs vector has the same distribution as this matrix
1762  if (!(*this->distribution_pt() == *rhs.distribution_pt()))
1763  {
1764  std::ostringstream error_message_stream;
1765  error_message_stream
1766  << "The vector rhs must have the same distribution as the matrix";
1767  throw OomphLibError(error_message_stream.str(),
1768  OOMPH_CURRENT_FUNCTION,
1769  OOMPH_EXCEPTION_LOCATION);
1770  }
1771 #endif
1772 
1773  // backsub
1774  DoubleVector rhs_copy(rhs);
1776  ->backsub(rhs_copy, rhs);
1777  }
1778 
1779  //=============================================================================
1780  /// Multiply the matrix by the vector x
1781  //=============================================================================
1783  {
1784 #ifdef PARANOID
1785  // check that this matrix is built
1786  if (!Built)
1787  {
1788  std::ostringstream error_message_stream;
1789  error_message_stream << "This matrix has not been built";
1790  throw OomphLibError(error_message_stream.str(),
1791  OOMPH_CURRENT_FUNCTION,
1792  OOMPH_EXCEPTION_LOCATION);
1793  }
1794  // check that the distribution of x is setup
1795  if (!x.built())
1796  {
1797  std::ostringstream error_message_stream;
1798  error_message_stream << "The distribution of the vector x must be setup";
1799  throw OomphLibError(error_message_stream.str(),
1800  OOMPH_CURRENT_FUNCTION,
1801  OOMPH_EXCEPTION_LOCATION);
1802  }
1803  // Check to see if x.size() = ncol().
1804  if (this->ncol() != x.distribution_pt()->nrow())
1805  {
1806  std::ostringstream error_message_stream;
1807  error_message_stream << "The number of rows in the x vector and the "
1808  "number of columns in the "
1809  << "matrix must be the same";
1810  throw OomphLibError(error_message_stream.str(),
1811  OOMPH_CURRENT_FUNCTION,
1812  OOMPH_EXCEPTION_LOCATION);
1813  }
1814  // if the soln is distributed
1815  if (soln.built())
1816  {
1817  if (!(*soln.distribution_pt() == *this->distribution_pt()))
1818  {
1819  std::ostringstream error_message_stream;
1820  error_message_stream
1821  << "The soln vector is setup and therefore must have the same "
1822  << "distribution as the matrix";
1823  throw OomphLibError(error_message_stream.str(),
1824  OOMPH_CURRENT_FUNCTION,
1825  OOMPH_EXCEPTION_LOCATION);
1826  }
1827  }
1828 #endif
1829 
1830  // if soln is not setup then setup the distribution
1831  if (!soln.built())
1832  {
1833  // Resize and initialize the solution vector
1834  soln.build(this->distribution_pt(), 0.0);
1835  }
1836 
1837  // Initialise
1838  soln.initialise(0.0);
1839 
1840  // if distributed and on more than one processor use trilinos
1841  // otherwise use the oomph-lib methods
1842  if (this->distributed() &&
1843  this->distribution_pt()->communicator_pt()->nproc() > 1)
1844  {
1845 #ifdef OOMPH_HAS_TRILINOS
1846  // This will only work if we have trilinos on board
1847  TrilinosEpetraHelpers::multiply(this, x, soln);
1848 #else
1849  std::ostringstream error_message_stream;
1850  error_message_stream
1851  << "Matrix-vector product on multiple processors with distributed "
1852  << "CRDoubleMatrix requires Trilinos.";
1853  throw OomphLibError(error_message_stream.str(),
1854  OOMPH_CURRENT_FUNCTION,
1855  OOMPH_EXCEPTION_LOCATION);
1856 #endif
1857  }
1858  else
1859  {
1860  unsigned n = this->nrow();
1861  const int* row_start = CR_matrix.row_start();
1862  const int* column_index = CR_matrix.column_index();
1863  const double* value = CR_matrix.value();
1864  double* soln_pt = soln.values_pt();
1865  const double* x_pt = x.values_pt();
1866  for (unsigned long i = 0; i < n; i++)
1867  {
1868  soln_pt[i] = 0.0;
1869  for (long k = row_start[i]; k < row_start[i + 1]; k++)
1870  {
1871  unsigned long j = column_index[k];
1872  double a_ij = value[k];
1873  soln_pt[i] += a_ij * x_pt[j];
1874  }
1875  }
1876  }
1877  }
1878 
1879  //=================================================================
1880  /// Multiply the transposed matrix by the vector x: soln=A^T x
1881  //=================================================================
1883  DoubleVector& soln) const
1884  {
1885 #ifdef PARANOID
1886  // check that this matrix is built
1887  if (!Built)
1888  {
1889  std::ostringstream error_message_stream;
1890  error_message_stream << "This matrix has not been built";
1891  throw OomphLibError(error_message_stream.str(),
1892  OOMPH_CURRENT_FUNCTION,
1893  OOMPH_EXCEPTION_LOCATION);
1894  }
1895  // Check to see if x.size() = ncol().
1896  if (!(*this->distribution_pt() == *x.distribution_pt()))
1897  {
1898  std::ostringstream error_message_stream;
1899  error_message_stream
1900  << "The x vector and this matrix must have the same distribution.";
1901  throw OomphLibError(error_message_stream.str(),
1902  OOMPH_CURRENT_FUNCTION,
1903  OOMPH_EXCEPTION_LOCATION);
1904  }
1905  // if soln is setup then it should have the same distribution as x
1906  if (soln.built())
1907  {
1908  if (soln.distribution_pt()->nrow() != this->ncol())
1909  {
1910  std::ostringstream error_message_stream;
1911  error_message_stream
1912  << "The soln vector is setup and therefore must have the same "
1913  << "number of rows as the vector x";
1914  throw OomphLibError(error_message_stream.str(),
1915  OOMPH_CURRENT_FUNCTION,
1916  OOMPH_EXCEPTION_LOCATION);
1917  }
1918  }
1919 #endif
1920 
1921  // if soln is not setup then setup the distribution
1922  if (!soln.built())
1923  {
1924  LinearAlgebraDistribution* dist_pt =
1926  this->ncol(),
1927  this->distributed());
1928  soln.build(dist_pt, 0.0);
1929  delete dist_pt;
1930  }
1931 
1932  // Initialise
1933  soln.initialise(0.0);
1934 
1935  if (this->distributed() &&
1936  this->distribution_pt()->communicator_pt()->nproc() > 1)
1937  {
1938 #ifdef OOMPH_HAS_TRILINOS
1939  // This will only work if we have trilinos on board
1940  TrilinosEpetraHelpers::multiply(this, x, soln);
1941 #else
1942  std::ostringstream error_message_stream;
1943  error_message_stream
1944  << "Matrix-vector product on multiple processors with distributed "
1945  << "CRDoubleMatrix requires Trilinos.";
1946  throw OomphLibError(error_message_stream.str(),
1947  OOMPH_CURRENT_FUNCTION,
1948  OOMPH_EXCEPTION_LOCATION);
1949 #endif
1950  }
1951  else
1952  {
1953  unsigned n = this->nrow();
1954  const int* row_start = CR_matrix.row_start();
1955  const int* column_index = CR_matrix.column_index();
1956  const double* value = CR_matrix.value();
1957  double* soln_pt = soln.values_pt();
1958  const double* x_pt = x.values_pt();
1959  // Matrix vector product
1960  for (unsigned long i = 0; i < n; i++)
1961  {
1962  for (long k = row_start[i]; k < row_start[i + 1]; k++)
1963  {
1964  unsigned long j = column_index[k];
1965  double a_ij = value[k];
1966  soln_pt[j] += a_ij * x_pt[i];
1967  }
1968  }
1969  }
1970  }
1971 
1972  //===========================================================================
1973  /// Function to multiply this matrix by the CRDoubleMatrix matrix_in.
1974  /// In a serial matrix, there are 4 methods available:
1975  /// Method 1: First runs through this matrix and matrix_in to find the storage
1976  /// requirements for result - arrays of the correct size are
1977  /// then allocated before performing the calculation.
1978  /// Minimises memory requirements but more costly.
1979  /// Method 2: Grows storage for values and column indices of result 'on the
1980  /// fly' using an array of maps. Faster but more memory
1981  /// intensive.
1982  /// Method 3: Grows storage for values and column indices of result 'on the
1983  /// fly' using a vector of vectors. Not particularly impressive
1984  /// on the platforms we tried...
1985  /// Method 4: Trilinos Epetra Matrix Matrix multiply.
1986  /// Method 5: Trilinox Epetra Matrix Matrix Mulitply (ml based)
1987  /// If Trilinos is installed then Method 4 is employed by default, otherwise
1988  /// Method 2 is employed by default.
1989  /// In a distributed matrix, only Trilinos Epetra Matrix Matrix multiply
1990  /// is available.
1991  //=============================================================================
1993  CRDoubleMatrix& result) const
1994  {
1995 #ifdef PARANOID
1996  // check that this matrix is built
1997  if (!Built)
1998  {
1999  std::ostringstream error_message_stream;
2000  error_message_stream << "This matrix has not been built";
2001  throw OomphLibError(error_message_stream.str(),
2002  OOMPH_CURRENT_FUNCTION,
2003  OOMPH_EXCEPTION_LOCATION);
2004  }
2005  // check that this matrix is built
2006  if (!matrix_in.built())
2007  {
2008  std::ostringstream error_message_stream;
2009  error_message_stream << "This matrix matrix_in has not been built";
2010  throw OomphLibError(error_message_stream.str(),
2011  OOMPH_CURRENT_FUNCTION,
2012  OOMPH_EXCEPTION_LOCATION);
2013  }
2014  // if soln is setup then it should have the same distribution as x
2015  if (result.built())
2016  {
2017  if (!(*result.distribution_pt() == *this->distribution_pt()))
2018  {
2019  std::ostringstream error_message_stream;
2020  error_message_stream
2021  << "The matrix result is setup and therefore must have the same "
2022  << "distribution as the vector x";
2023  throw OomphLibError(error_message_stream.str(),
2024  OOMPH_CURRENT_FUNCTION,
2025  OOMPH_EXCEPTION_LOCATION);
2026  }
2027  }
2028 #endif
2029 
2030  // if the result has not been setup, then store the distribution
2031  if (!result.distribution_built())
2032  {
2033  result.build(this->distribution_pt());
2034  }
2035 
2036  // short name for Serial_matrix_matrix_multiply_method
2037  unsigned method = Serial_matrix_matrix_multiply_method;
2038 
2039  // if this matrix is not distributed and matrix in is not distributed
2040  if (!this->distributed() && !matrix_in.distributed() &&
2041  ((method == 1) || (method == 2) || (method == 3)))
2042  {
2043  // NB N is number of rows!
2044  unsigned long N = this->nrow();
2045  unsigned long M = matrix_in.ncol();
2046  unsigned long Nnz = 0;
2047 
2048  // pointers to arrays which store result
2049  int* Row_start = 0;
2050  double* Value = 0;
2051  int* Column_index = 0;
2052 
2053  // get pointers to matrix_in
2054  const int* matrix_in_row_start = matrix_in.row_start();
2055  const int* matrix_in_column_index = matrix_in.column_index();
2056  const double* matrix_in_value = matrix_in.value();
2057 
2058  // get pointers to this matrix
2059  const double* this_value = this->value();
2060  const int* this_row_start = this->row_start();
2061  const int* this_column_index = this->column_index();
2062 
2063  // clock_t clock1 = clock();
2064 
2065  // METHOD 1
2066  // --------
2067  if (method == 1)
2068  {
2069  // allocate storage for row starts
2070  Row_start = new int[N + 1];
2071  Row_start[0] = 0;
2072 
2073  // a set to store number of non-zero columns in each row of result
2074  std::set<unsigned> columns;
2075 
2076  // run through rows of this matrix and matrix_in to find number of
2077  // non-zero entries in each row of result
2078  for (unsigned long this_row = 0; this_row < N; this_row++)
2079  {
2080  // run through non-zeros in this_row of this matrix
2081  for (int this_ptr = this_row_start[this_row];
2082  this_ptr < this_row_start[this_row + 1];
2083  this_ptr++)
2084  {
2085  // find column index for non-zero
2086  int matrix_in_row = this_column_index[this_ptr];
2087 
2088  // run through corresponding row in matrix_in
2089  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2090  matrix_in_ptr < matrix_in_row_start[matrix_in_row + 1];
2091  matrix_in_ptr++)
2092  {
2093  // find column index for non-zero in matrix_in and store in
2094  // columns
2095  columns.insert(matrix_in_column_index[matrix_in_ptr]);
2096  }
2097  }
2098  // update Row_start
2099  Row_start[this_row + 1] = Row_start[this_row] + columns.size();
2100 
2101  // wipe values in columns
2102  columns.clear();
2103  }
2104 
2105  // set Nnz
2106  Nnz = Row_start[N];
2107 
2108  // allocate arrays for result
2109  Value = new double[Nnz];
2110  Column_index = new int[Nnz];
2111 
2112  // set all values of Column_index to -1
2113  for (unsigned long i = 0; i < Nnz; i++)
2114  {
2115  Column_index[i] = -1;
2116  }
2117 
2118  // Calculate values for result - first run through rows of this matrix
2119  for (unsigned long this_row = 0; this_row < N; this_row++)
2120  {
2121  // run through non-zeros in this_row
2122  for (int this_ptr = this_row_start[this_row];
2123  this_ptr < this_row_start[this_row + 1];
2124  this_ptr++)
2125  {
2126  // find value of non-zero
2127  double this_val = this_value[this_ptr];
2128 
2129  // find column associated with non-zero
2130  int matrix_in_row = this_column_index[this_ptr];
2131 
2132  // run through corresponding row in matrix_in
2133  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2134  matrix_in_ptr < matrix_in_row_start[matrix_in_row + 1];
2135  matrix_in_ptr++)
2136  {
2137  // find column index for non-zero in matrix_in
2138  int col = matrix_in_column_index[matrix_in_ptr];
2139 
2140  // find position in result to insert value
2141  for (int ptr = Row_start[this_row];
2142  ptr <= Row_start[this_row + 1];
2143  ptr++)
2144  {
2145  if (ptr == Row_start[this_row + 1])
2146  {
2147  // error - have passed end of row without finding
2148  // correct column
2149  std::ostringstream error_message;
2150  error_message << "Error inserting value in result";
2151 
2152  throw OomphLibError(error_message.str(),
2153  OOMPH_CURRENT_FUNCTION,
2154  OOMPH_EXCEPTION_LOCATION);
2155  }
2156  else if (Column_index[ptr] == -1)
2157  {
2158  // first entry for this column index
2159  Column_index[ptr] = col;
2160  Value[ptr] = this_val * matrix_in_value[matrix_in_ptr];
2161  break;
2162  }
2163  else if (Column_index[ptr] == col)
2164  {
2165  // column index already exists - add value
2166  Value[ptr] += this_val * matrix_in_value[matrix_in_ptr];
2167  break;
2168  }
2169  }
2170  }
2171  }
2172  }
2173  }
2174 
2175  // METHOD 2
2176  // --------
2177  else if (method == 2)
2178  {
2179  // generate array of maps to store values for result
2180  std::map<int, double>* result_maps = new std::map<int, double>[N];
2181 
2182  // run through rows of this matrix
2183  for (unsigned long this_row = 0; this_row < N; this_row++)
2184  {
2185  // run through non-zeros in this_row
2186  for (int this_ptr = this_row_start[this_row];
2187  this_ptr < this_row_start[this_row + 1];
2188  this_ptr++)
2189  {
2190  // find value of non-zero
2191  double this_val = this_value[this_ptr];
2192 
2193  // find column index associated with non-zero
2194  int matrix_in_row = this_column_index[this_ptr];
2195 
2196  // run through corresponding row in matrix_in
2197  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2198  matrix_in_ptr < matrix_in_row_start[matrix_in_row + 1];
2199  matrix_in_ptr++)
2200  {
2201  // find column index for non-zero in matrix_in
2202  int col = matrix_in_column_index[matrix_in_ptr];
2203 
2204  // insert value
2205  result_maps[this_row][col] +=
2206  this_val * matrix_in_value[matrix_in_ptr];
2207  }
2208  }
2209  }
2210 
2211  // allocate Row_start
2212  Row_start = new int[N + 1];
2213 
2214  // copy across row starts
2215  Row_start[0] = 0;
2216  for (unsigned long row = 0; row < N; row++)
2217  {
2218  int size = result_maps[row].size();
2219  Row_start[row + 1] = Row_start[row] + size;
2220  }
2221 
2222  // set Nnz
2223  Nnz = Row_start[N];
2224 
2225  // allocate other arrays
2226  Value = new double[Nnz];
2227  Column_index = new int[Nnz];
2228 
2229  // copy values and column indices
2230  for (unsigned long row = 0; row < N; row++)
2231  {
2232  unsigned ptr = Row_start[row];
2233  for (std::map<int, double>::iterator i = result_maps[row].begin();
2234  i != result_maps[row].end();
2235  i++)
2236  {
2237  Column_index[ptr] = i->first;
2238  Value[ptr] = i->second;
2239  ptr++;
2240  }
2241  }
2242 
2243  // tidy up memory
2244  delete[] result_maps;
2245  }
2246 
2247  // METHOD 3
2248  // --------
2249  else if (method == 3)
2250  {
2251  // vectors of vectors to store results
2252  std::vector<std::vector<int>> result_cols(N);
2253  std::vector<std::vector<double>> result_vals(N);
2254 
2255  // run through the rows of this matrix
2256  for (unsigned long this_row = 0; this_row < N; this_row++)
2257  {
2258  // run through non-zeros in this_row
2259  for (int this_ptr = this_row_start[this_row];
2260  this_ptr < this_row_start[this_row + 1];
2261  this_ptr++)
2262  {
2263  // find value of non-zero
2264  double this_val = this_value[this_ptr];
2265 
2266  // find column index associated with non-zero
2267  int matrix_in_row = this_column_index[this_ptr];
2268 
2269  // run through corresponding row in matrix_in
2270  for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
2271  matrix_in_ptr < matrix_in_row_start[matrix_in_row + 1];
2272  matrix_in_ptr++)
2273  {
2274  // find column index for non-zero in matrix_in
2275  int col = matrix_in_column_index[matrix_in_ptr];
2276 
2277  // insert value
2278  int size = result_cols[this_row].size();
2279  for (int i = 0; i <= size; i++)
2280  {
2281  if (i == size)
2282  {
2283  // first entry for this column
2284  result_cols[this_row].push_back(col);
2285  result_vals[this_row].push_back(
2286  this_val * matrix_in_value[matrix_in_ptr]);
2287  }
2288  else if (col == result_cols[this_row][i])
2289  {
2290  // column already exists
2291  result_vals[this_row][i] +=
2292  this_val * matrix_in_value[matrix_in_ptr];
2293  break;
2294  }
2295  }
2296  }
2297  }
2298  }
2299 
2300  // allocate Row_start
2301  Row_start = new int[N + 1];
2302 
2303  // copy across row starts
2304  Row_start[0] = 0;
2305  for (unsigned long row = 0; row < N; row++)
2306  {
2307  int size = result_cols[row].size();
2308  Row_start[row + 1] = Row_start[row] + size;
2309  }
2310 
2311  // set Nnz
2312  Nnz = Row_start[N];
2313 
2314  // allocate other arrays
2315  Value = new double[Nnz];
2316  Column_index = new int[Nnz];
2317 
2318  // copy across values and column indices
2319  for (unsigned long row = 0; row < N; row++)
2320  {
2321  unsigned ptr = Row_start[row];
2322  unsigned nnn = result_cols[row].size();
2323  for (unsigned i = 0; i < nnn; i++)
2324  {
2325  Column_index[ptr] = result_cols[row][i];
2326  Value[ptr] = result_vals[row][i];
2327  ptr++;
2328  }
2329  }
2330  }
2331 
2332  // build
2333  result.build_without_copy(M, Nnz, Value, Column_index, Row_start);
2334  }
2335 
2336  // else we have to use trilinos
2337  else
2338  {
2339 #ifdef OOMPH_HAS_TRILINOS
2340  bool use_ml = false;
2341  if (method == 5)
2342  {
2343  use_ml = true;
2344  }
2345  TrilinosEpetraHelpers::multiply(*this, matrix_in, result, use_ml);
2346 #else
2347  std::ostringstream error_message;
2348  error_message << "Serial_matrix_matrix_multiply_method = "
2350  << " requires trilinos.";
2351  throw OomphLibError(
2352  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
2353 #endif
2354  }
2355  }
2356 
2357 
2358  //=================================================================
2359  /// For every row, find the maximum absolute value of the
2360  /// entries in this row. Set all values that are less than alpha times
2361  /// this maximum to zero and return the resulting matrix in
2362  /// reduced_matrix. Note: Diagonal entries are retained regardless
2363  /// of their size.
2364  //=================================================================
2365  void CRDoubleMatrix::matrix_reduction(const double& alpha,
2366  CRDoubleMatrix& reduced_matrix)
2367  {
2368  // number of rows in matrix
2369  long n_row = nrow_local();
2370  double max_row;
2371 
2372  // Here's the packed format for the new matrix
2373  Vector<int> B_row_start(1);
2374  Vector<int> B_column_index;
2375  Vector<double> B_value;
2376 
2377  // get pointers to the underlying data
2378  const int* row_start = CR_matrix.row_start();
2379  const int* column_index = CR_matrix.column_index();
2380  const double* value = CR_matrix.value();
2381 
2382  // k is counter for the number of entries in the reduced matrix
2383  unsigned k = 0;
2384 
2385  // Initialise row start
2386  B_row_start[0] = 0;
2387 
2388  // Loop over rows
2389  for (long i = 0; i < n_row; i++)
2390  {
2391  // Initialise max value in row
2392  max_row = 0.0;
2393 
2394  // Loop over entries in columns
2395  for (long j = row_start[i]; j < row_start[i + 1]; j++)
2396  {
2397  // Find max. value in row
2398  if (std::fabs(value[j]) > max_row)
2399  {
2400  max_row = std::fabs(value[j]);
2401  }
2402  }
2403 
2404  // Decide if we need to retain the entries in the row
2405  for (long j = row_start[i]; j < row_start[i + 1]; j++)
2406  {
2407  // If we're on the diagonal or the value is sufficiently large: retain
2408  // i.e. copy across.
2409  if (i == column_index[j] || std::fabs(value[j]) > alpha * max_row)
2410  {
2411  B_value.push_back(value[j]);
2412  B_column_index.push_back(column_index[j]);
2413  k++;
2414  }
2415  }
2416  // This writes the row start for the next row -- equal to
2417  // to the number of entries written so far (C++ zero-based indexing!)
2418  B_row_start.push_back(k);
2419  }
2420 
2421  // Build the matrix from the compressed format
2422  dynamic_cast<CRDoubleMatrix&>(reduced_matrix)
2423  .build(this->ncol(), B_value, B_column_index, B_row_start);
2424  }
2425 
2426  //=============================================================================
2427  /// if this matrix is distributed then the equivalent global matrix is built
2428  /// using new and returned. The calling method is responsible for the
2429  /// destruction of the new matrix.
2430  //=============================================================================
2432  {
2433 #ifdef OOMPH_HAS_MPI
2434  // if this matrix is not distributed then this method is redundant
2435  if (!this->distributed() ||
2436  this->distribution_pt()->communicator_pt()->nproc() == 1)
2437  {
2438  return new CRDoubleMatrix(*this);
2439  }
2440 
2441  // nnz
2442  int nnz = this->nnz();
2443 
2444  // my nrow local
2445  unsigned nrow_local = this->nrow_local();
2446 
2447  // nrow global
2448  unsigned nrow = this->nrow();
2449 
2450  // cache nproc
2451  int nproc = this->distribution_pt()->communicator_pt()->nproc();
2452 
2453  // get the nnzs on the other processors
2454  int* dist_nnz_pt = new int[nproc];
2455  MPI_Allgather(&nnz,
2456  1,
2457  MPI_INT,
2458  dist_nnz_pt,
2459  1,
2460  MPI_INT,
2461  this->distribution_pt()->communicator_pt()->mpi_comm());
2462 
2463  // create a int vector of first rows and nrow local and compute nnz global
2464  int* dist_first_row = new int[nproc];
2465  int* dist_nrow_local = new int[nproc];
2466  int nnz_global = 0;
2467  for (int p = 0; p < nproc; p++)
2468  {
2469  nnz_global += dist_nnz_pt[p];
2470  dist_first_row[p] = this->first_row(p);
2471  dist_nrow_local[p] = this->nrow_local(p);
2472  }
2473 
2474  // compute the offset for the values and column index data
2475  int* nnz_offset = new int[nproc];
2476  nnz_offset[0] = 0;
2477  for (int p = 1; p < nproc; p++)
2478  {
2479  nnz_offset[p] = nnz_offset[p - 1] + dist_nnz_pt[p - 1];
2480  }
2481 
2482  // get pointers to the (current) distributed data
2483  // const_cast required because MPI requires non-const data when sending
2484  // data
2485  int* dist_row_start = const_cast<int*>(this->row_start());
2486  int* dist_column_index = const_cast<int*>(this->column_index());
2487  double* dist_value = const_cast<double*>(this->value());
2488 
2489  // space for the global matrix
2490  int* global_row_start = new int[nrow + 1];
2491  int* global_column_index = new int[nnz_global];
2492  double* global_value = new double[nnz_global];
2493 
2494  // get the row starts
2495  MPI_Allgatherv(dist_row_start,
2496  nrow_local,
2497  MPI_INT,
2498  global_row_start,
2499  dist_nrow_local,
2500  dist_first_row,
2501  MPI_INT,
2502  this->distribution_pt()->communicator_pt()->mpi_comm());
2503 
2504  // get the column indexes
2505  MPI_Allgatherv(dist_column_index,
2506  nnz,
2507  MPI_INT,
2508  global_column_index,
2509  dist_nnz_pt,
2510  nnz_offset,
2511  MPI_INT,
2512  this->distribution_pt()->communicator_pt()->mpi_comm());
2513 
2514  // get the values
2515  MPI_Allgatherv(dist_value,
2516  nnz,
2517  MPI_DOUBLE,
2518  global_value,
2519  dist_nnz_pt,
2520  nnz_offset,
2521  MPI_DOUBLE,
2522  this->distribution_pt()->communicator_pt()->mpi_comm());
2523 
2524  // finally the last row start
2525  global_row_start[nrow] = nnz_global;
2526 
2527  // update the other row start
2528  for (int p = 0; p < nproc; p++)
2529  {
2530  for (int i = 0; i < dist_nrow_local[p]; i++)
2531  {
2532  unsigned j = dist_first_row[p] + i;
2533  global_row_start[j] += nnz_offset[p];
2534  }
2535  }
2536 
2537  // create the global distribution
2539  this->distribution_pt()->communicator_pt(), nrow, false);
2540 
2541  // create the matrix
2542  CRDoubleMatrix* matrix_pt = new CRDoubleMatrix(dist_pt);
2543 
2544  // copy of distribution taken so delete
2545  delete dist_pt;
2546 
2547  // pass data into matrix
2548  matrix_pt->build_without_copy(this->ncol(),
2549  nnz_global,
2550  global_value,
2551  global_column_index,
2552  global_row_start);
2553 
2554  // clean up
2555  delete[] dist_first_row;
2556  delete[] dist_nrow_local;
2557  delete[] nnz_offset;
2558  delete[] dist_nnz_pt;
2559 
2560  // and return
2561  return matrix_pt;
2562 #else
2563  return new CRDoubleMatrix(*this);
2564 #endif
2565  }
2566 
2567  //============================================================================
2568  /// The contents of the matrix are redistributed to match the new
2569  /// distribution. In a non-MPI build this method does nothing.
2570  /// \b NOTE 1: The current distribution and the new distribution must have
2571  /// the same number of global rows.
2572  /// \b NOTE 2: The current distribution and the new distribution must have
2573  /// the same Communicator.
2574  //============================================================================
2576  const LinearAlgebraDistribution* const& dist_pt)
2577  {
2578 #ifdef OOMPH_HAS_MPI
2579 #ifdef PARANOID
2580  // paranoid check that the nrows for both distributions is the
2581  // same
2582  if (dist_pt->nrow() != this->distribution_pt()->nrow())
2583  {
2584  std::ostringstream error_message;
2585  error_message << "The number of global rows in the new distribution ("
2586  << dist_pt->nrow() << ") is not equal to the number"
2587  << " of global rows in the current distribution ("
2588  << this->distribution_pt()->nrow() << ").\n";
2589  throw OomphLibError(
2590  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
2591  }
2592  // paranoid check that the current distribution and the new distribution
2593  // have the same Communicator
2594  OomphCommunicator temp_comm(*dist_pt->communicator_pt());
2595  if (!(temp_comm == *this->distribution_pt()->communicator_pt()))
2596  {
2597  std::ostringstream error_message;
2598  error_message << "The new distribution and the current distribution must "
2599  << "have the same communicator.";
2600  throw OomphLibError(
2601  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
2602  }
2603  // paranoid check that the matrix is build
2604  if (!this->built())
2605  {
2606  std::ostringstream error_message;
2607  error_message << "The matrix must be build to be redistributed";
2608  throw OomphLibError(
2609  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
2610  }
2611 #endif
2612 
2613  // if the two distributions are not the same
2614  // =========================================
2615  if (!((*this->distribution_pt()) == *dist_pt))
2616  {
2617  // Get the number of columns to build the matrix.
2618  unsigned long ncol = this->ncol();
2619 
2620  // current data
2621  int* current_row_start = this->row_start();
2622  int* current_column_index = this->column_index();
2623  double* current_value = this->value();
2624 
2625  // get the rank and the number of processors
2626  int my_rank = this->distribution_pt()->communicator_pt()->my_rank();
2627  int nproc = this->distribution_pt()->communicator_pt()->nproc();
2628 
2629  // if both distributions are distributed
2630  // =====================================
2631  if (this->distributed() && dist_pt->distributed())
2632  {
2633  // new nrow_local and first_row data
2634  Vector<unsigned> new_first_row(nproc);
2635  Vector<unsigned> new_nrow_local(nproc);
2636  Vector<unsigned> current_first_row(nproc);
2637  Vector<unsigned> current_nrow_local(nproc);
2638  for (int i = 0; i < nproc; i++)
2639  {
2640  new_first_row[i] = dist_pt->first_row(i);
2641  new_nrow_local[i] = dist_pt->nrow_local(i);
2642  current_first_row[i] = this->first_row(i);
2643  current_nrow_local[i] = this->nrow_local(i);
2644  }
2645 
2646  // compute which local rows are expected to be received from each
2647  // processor / sent to each processor
2648  Vector<unsigned> first_row_for_proc(nproc, 0);
2649  Vector<unsigned> nrow_local_for_proc(nproc, 0);
2650  Vector<unsigned> first_row_from_proc(nproc, 0);
2651  Vector<unsigned> nrow_local_from_proc(nproc, 0);
2652 
2653  // for every processor compute first_row and nrow_local that will
2654  // will sent and received by this processor
2655  for (int p = 0; p < nproc; p++)
2656  {
2657  // start with data to be sent
2658  if ((new_first_row[p] <
2659  (current_first_row[my_rank] + current_nrow_local[my_rank])) &&
2660  (current_first_row[my_rank] <
2661  (new_first_row[p] + new_nrow_local[p])))
2662  {
2663  first_row_for_proc[p] =
2664  std::max(current_first_row[my_rank], new_first_row[p]);
2665  nrow_local_for_proc[p] =
2666  std::min(
2667  (current_first_row[my_rank] + current_nrow_local[my_rank]),
2668  (new_first_row[p] + new_nrow_local[p])) -
2669  first_row_for_proc[p];
2670  }
2671 
2672  // and data to be received
2673  if ((new_first_row[my_rank] <
2674  (current_first_row[p] + current_nrow_local[p])) &&
2675  (current_first_row[p] <
2676  (new_first_row[my_rank] + new_nrow_local[my_rank])))
2677  {
2678  first_row_from_proc[p] =
2679  std::max(current_first_row[p], new_first_row[my_rank]);
2680  nrow_local_from_proc[p] =
2681  std::min((current_first_row[p] + current_nrow_local[p]),
2682  (new_first_row[my_rank] + new_nrow_local[my_rank])) -
2683  first_row_from_proc[p];
2684  }
2685  }
2686 
2687  // determine how many nnzs to send to each processor
2688  Vector<unsigned> nnz_for_proc(nproc, 0);
2689  for (int p = 0; p < nproc; p++)
2690  {
2691  if (nrow_local_for_proc[p] > 0)
2692  {
2693  nnz_for_proc[p] = (current_row_start[first_row_for_proc[p] -
2694  current_first_row[my_rank] +
2695  nrow_local_for_proc[p]] -
2696  current_row_start[first_row_for_proc[p] -
2697  current_first_row[my_rank]]);
2698  }
2699  }
2700 
2701  // next post non-blocking sends and recv for the nnzs
2702  Vector<unsigned> nnz_from_proc(nproc, 0);
2703  Vector<MPI_Request> send_req;
2704  Vector<MPI_Request> nnz_recv_req;
2705  for (int p = 0; p < nproc; p++)
2706  {
2707  if (p != my_rank)
2708  {
2709  // send
2710  if (nrow_local_for_proc[p] > 0)
2711  {
2712  MPI_Request req;
2713  MPI_Isend(&nnz_for_proc[p],
2714  1,
2715  MPI_UNSIGNED,
2716  p,
2717  0,
2718  this->distribution_pt()->communicator_pt()->mpi_comm(),
2719  &req);
2720  send_req.push_back(req);
2721  }
2722 
2723  // recv
2724  if (nrow_local_from_proc[p] > 0)
2725  {
2726  MPI_Request req;
2727  MPI_Irecv(&nnz_from_proc[p],
2728  1,
2729  MPI_UNSIGNED,
2730  p,
2731  0,
2732  this->distribution_pt()->communicator_pt()->mpi_comm(),
2733  &req);
2734  nnz_recv_req.push_back(req);
2735  }
2736  }
2737  // "send to self"
2738  else
2739  {
2740  nnz_from_proc[p] = nnz_for_proc[p];
2741  }
2742  }
2743 
2744  // allocate new storage for the new row_start
2745  int* new_row_start = new int[new_nrow_local[my_rank] + 1];
2746 
2747  // wait for recvs to complete
2748  unsigned n_recv_req = nnz_recv_req.size();
2749  if (n_recv_req > 0)
2750  {
2751  Vector<MPI_Status> recv_status(n_recv_req);
2752  MPI_Waitall(n_recv_req, &nnz_recv_req[0], &recv_status[0]);
2753  }
2754 
2755  // compute the nnz offset for each processor
2756  unsigned next_row = 0;
2757  unsigned nnz_count = 0;
2758  Vector<unsigned> nnz_offset(nproc, 0);
2759  for (int p = 0; p < nproc; p++)
2760  {
2761  unsigned pp = 0;
2762  while (new_first_row[pp] != next_row)
2763  {
2764  pp++;
2765  }
2766  nnz_offset[pp] = nnz_count;
2767  nnz_count += nnz_from_proc[pp];
2768  next_row += new_nrow_local[pp];
2769  }
2770 
2771  // allocate storage for the values and column indices
2772  int* new_column_index = new int[nnz_count];
2773  double* new_value = new double[nnz_count];
2774 
2775  // post the sends and recvs for the matrix data
2776  Vector<MPI_Request> recv_req;
2777  MPI_Aint base_address;
2778  MPI_Get_address(new_value, &base_address);
2779  for (int p = 0; p < nproc; p++)
2780  {
2781  // communicated with other processors
2782  if (p != my_rank)
2783  {
2784  // SEND
2785  if (nrow_local_for_proc[p] > 0)
2786  {
2787  // array of datatypes
2788  MPI_Datatype types[3];
2789 
2790  // array of offsets
2791  MPI_Aint offsets[3];
2792 
2793  // array of lengths
2794  int len[3];
2795 
2796  // row start
2797  unsigned first_row_to_send =
2798  first_row_for_proc[p] - current_first_row[my_rank];
2799  MPI_Type_contiguous(nrow_local_for_proc[p], MPI_INT, &types[0]);
2800  MPI_Type_commit(&types[0]);
2801  len[0] = 1;
2802  MPI_Get_address(current_row_start + first_row_to_send,
2803  &offsets[0]);
2804  offsets[0] -= base_address;
2805 
2806  // values
2807  unsigned first_coef_to_send =
2808  current_row_start[first_row_to_send];
2809  MPI_Type_contiguous(nnz_for_proc[p], MPI_DOUBLE, &types[1]);
2810  MPI_Type_commit(&types[1]);
2811  len[1] = 1;
2812  MPI_Get_address(current_value + first_coef_to_send, &offsets[1]);
2813  offsets[1] -= base_address;
2814 
2815  // column index
2816  MPI_Type_contiguous(nnz_for_proc[p], MPI_DOUBLE, &types[2]);
2817  MPI_Type_commit(&types[2]);
2818  len[2] = 1;
2819  MPI_Get_address(current_column_index + first_coef_to_send,
2820  &offsets[2]);
2821  offsets[2] -= base_address;
2822 
2823  // build the combined datatype
2824  MPI_Datatype send_type;
2825  MPI_Type_create_struct(3, len, offsets, types, &send_type);
2826  MPI_Type_commit(&send_type);
2827  MPI_Type_free(&types[0]);
2828  MPI_Type_free(&types[1]);
2829  MPI_Type_free(&types[2]);
2830 
2831  // and send
2832  MPI_Request req;
2833  MPI_Isend(new_value,
2834  1,
2835  send_type,
2836  p,
2837  1,
2838  this->distribution_pt()->communicator_pt()->mpi_comm(),
2839  &req);
2840  send_req.push_back(req);
2841  MPI_Type_free(&send_type);
2842  }
2843 
2844  // RECV
2845  if (nrow_local_from_proc[p] > 0)
2846  {
2847  // array of datatypes
2848  MPI_Datatype types[3];
2849 
2850  // array of offsets
2851  MPI_Aint offsets[3];
2852 
2853  // array of lengths
2854  int len[3];
2855 
2856  // row start
2857  unsigned first_row_to_recv =
2858  first_row_from_proc[p] - new_first_row[my_rank];
2859  MPI_Type_contiguous(nrow_local_from_proc[p], MPI_INT, &types[0]);
2860  MPI_Type_commit(&types[0]);
2861  len[0] = 1;
2862  MPI_Get_address(new_row_start + first_row_to_recv, &offsets[0]);
2863  offsets[0] -= base_address;
2864 
2865  // values
2866  unsigned first_coef_to_recv = nnz_offset[p];
2867  MPI_Type_contiguous(nnz_from_proc[p], MPI_DOUBLE, &types[1]);
2868  MPI_Type_commit(&types[1]);
2869  len[1] = 1;
2870  MPI_Get_address(new_value + first_coef_to_recv, &offsets[1]);
2871  offsets[1] -= base_address;
2872 
2873  // column index
2874  MPI_Type_contiguous(nnz_from_proc[p], MPI_INT, &types[2]);
2875  MPI_Type_commit(&types[2]);
2876  len[2] = 1;
2877  MPI_Get_address(new_column_index + first_coef_to_recv,
2878  &offsets[2]);
2879  offsets[2] -= base_address;
2880 
2881  // build the combined datatype
2882  MPI_Datatype recv_type;
2883  MPI_Type_create_struct(3, len, offsets, types, &recv_type);
2884  MPI_Type_commit(&recv_type);
2885  MPI_Type_free(&types[0]);
2886  MPI_Type_free(&types[1]);
2887  MPI_Type_free(&types[2]);
2888 
2889  // and send
2890  MPI_Request req;
2891  MPI_Irecv(new_value,
2892  1,
2893  recv_type,
2894  p,
2895  1,
2896  this->distribution_pt()->communicator_pt()->mpi_comm(),
2897  &req);
2898  recv_req.push_back(req);
2899  MPI_Type_free(&recv_type);
2900  }
2901  }
2902  // other wise transfer data internally
2903  else
2904  {
2905  unsigned j =
2906  first_row_for_proc[my_rank] - current_first_row[my_rank];
2907  unsigned k = first_row_from_proc[my_rank] - new_first_row[my_rank];
2908  for (unsigned i = 0; i < nrow_local_for_proc[my_rank]; i++)
2909  {
2910  new_row_start[k + i] = current_row_start[j + i];
2911  }
2912  unsigned first_coef_to_send = current_row_start[j];
2913  for (unsigned i = 0; i < nnz_for_proc[my_rank]; i++)
2914  {
2915  new_value[nnz_offset[p] + i] =
2916  current_value[first_coef_to_send + i];
2917  new_column_index[nnz_offset[p] + i] =
2918  current_column_index[first_coef_to_send + i];
2919  }
2920  }
2921  }
2922 
2923  // wait for all recvs to complete
2924  n_recv_req = recv_req.size();
2925  if (n_recv_req > 0)
2926  {
2927  Vector<MPI_Status> recv_status(n_recv_req);
2928  MPI_Waitall(n_recv_req, &recv_req[0], &recv_status[0]);
2929  }
2930 
2931  // next we need to update the row starts
2932  for (int p = 0; p < nproc; p++)
2933  {
2934  if (nrow_local_from_proc[p] > 0)
2935  {
2936  unsigned first_row =
2937  first_row_from_proc[p] - new_first_row[my_rank];
2938  unsigned last_row = first_row + nrow_local_from_proc[p] - 1;
2939  int update = nnz_offset[p] - new_row_start[first_row];
2940  for (unsigned i = first_row; i <= last_row; i++)
2941  {
2942  new_row_start[i] += update;
2943  }
2944  }
2945  }
2946  new_row_start[dist_pt->nrow_local()] = nnz_count;
2947 
2948  // wait for sends to complete
2949  unsigned n_send_req = send_req.size();
2950  if (n_recv_req > 0)
2951  {
2952  Vector<MPI_Status> send_status(n_recv_req);
2953  MPI_Waitall(n_send_req, &send_req[0], &send_status[0]);
2954  }
2955  // if (my_rank == 0)
2956  // {
2957  // CRDoubleMatrix* m_pt = this->global_matrix();
2958  // m_pt->sparse_indexed_output("m1.dat");
2959  // }
2960 
2961  //
2962  this->build(dist_pt);
2963  this->build_without_copy(
2964  ncol, nnz_count, new_value, new_column_index, new_row_start);
2965  // if (my_rank == 0)
2966  // {
2967  // CRDoubleMatrix* m_pt = this->global_matrix();
2968  // m_pt->sparse_indexed_output("m2.dat");
2969  // }
2970  // this->sparse_indexed_output(oomph_info);
2971  abort();
2972  }
2973 
2974  // if this matrix is distributed but the new distributed matrix is global
2975  // ======================================================================
2976  else if (this->distributed() && !dist_pt->distributed())
2977  {
2978  // nnz
2979  int nnz = this->nnz();
2980 
2981  // nrow global
2982  unsigned nrow = this->nrow();
2983 
2984  // cache nproc
2985  int nproc = this->distribution_pt()->communicator_pt()->nproc();
2986 
2987  // get the nnzs on the other processors
2988  int* dist_nnz_pt = new int[nproc];
2989  MPI_Allgather(&nnz,
2990  1,
2991  MPI_INT,
2992  dist_nnz_pt,
2993  1,
2994  MPI_INT,
2995  this->distribution_pt()->communicator_pt()->mpi_comm());
2996 
2997  // create an int array of first rows and nrow local and
2998  // compute nnz global
2999  int* dist_first_row = new int[nproc];
3000  int* dist_nrow_local = new int[nproc];
3001  for (int p = 0; p < nproc; p++)
3002  {
3003  dist_first_row[p] = this->first_row(p);
3004  dist_nrow_local[p] = this->nrow_local(p);
3005  }
3006 
3007  // conpute the offset for the values and column index data
3008  // compute the nnz offset for each processor
3009  int next_row = 0;
3010  unsigned nnz_count = 0;
3011  Vector<unsigned> nnz_offset(nproc, 0);
3012  for (int p = 0; p < nproc; p++)
3013  {
3014  unsigned pp = 0;
3015  while (dist_first_row[pp] != next_row)
3016  {
3017  pp++;
3018  }
3019  nnz_offset[pp] = nnz_count;
3020  nnz_count += dist_nnz_pt[pp];
3021  next_row += dist_nrow_local[pp];
3022  }
3023 
3024  // get pointers to the (current) distributed data
3025  int* dist_row_start = this->row_start();
3026  int* dist_column_index = this->column_index();
3027  double* dist_value = this->value();
3028 
3029  // space for the global matrix
3030  int* global_row_start = new int[nrow + 1];
3031  int* global_column_index = new int[nnz_count];
3032  double* global_value = new double[nnz_count];
3033 
3034  // post the sends and recvs for the matrix data
3035  Vector<MPI_Request> recv_req;
3036  Vector<MPI_Request> send_req;
3037  MPI_Aint base_address;
3038  MPI_Get_address(global_value, &base_address);
3039 
3040  // SEND
3041  if (dist_nrow_local[my_rank] > 0)
3042  {
3043  // types
3044  MPI_Datatype types[3];
3045 
3046  // offsets
3047  MPI_Aint offsets[3];
3048 
3049  // lengths
3050  int len[3];
3051 
3052  // row start
3053  MPI_Type_contiguous(dist_nrow_local[my_rank], MPI_INT, &types[0]);
3054  MPI_Type_commit(&types[0]);
3055  MPI_Get_address(dist_row_start, &offsets[0]);
3056  offsets[0] -= base_address;
3057  len[0] = 1;
3058 
3059  // value
3060  MPI_Type_contiguous(nnz, MPI_DOUBLE, &types[1]);
3061  MPI_Type_commit(&types[1]);
3062  MPI_Get_address(dist_value, &offsets[1]);
3063  offsets[1] -= base_address;
3064  len[1] = 1;
3065 
3066  // column indices
3067  MPI_Type_contiguous(nnz, MPI_INT, &types[2]);
3068  MPI_Type_commit(&types[2]);
3069  MPI_Get_address(dist_column_index, &offsets[2]);
3070  offsets[2] -= base_address;
3071  len[2] = 1;
3072 
3073  // build the send type
3074  MPI_Datatype send_type;
3075  MPI_Type_create_struct(3, len, offsets, types, &send_type);
3076  MPI_Type_commit(&send_type);
3077  MPI_Type_free(&types[0]);
3078  MPI_Type_free(&types[1]);
3079  MPI_Type_free(&types[2]);
3080 
3081  // and send
3082  for (int p = 0; p < nproc; p++)
3083  {
3084  if (p != my_rank)
3085  {
3086  MPI_Request req;
3087  MPI_Isend(global_value,
3088  1,
3089  send_type,
3090  p,
3091  1,
3092  this->distribution_pt()->communicator_pt()->mpi_comm(),
3093  &req);
3094  send_req.push_back(req);
3095  }
3096  }
3097  MPI_Type_free(&send_type);
3098  }
3099 
3100  // RECV
3101  for (int p = 0; p < nproc; p++)
3102  {
3103  // communicated with other processors
3104  if (p != my_rank)
3105  {
3106  // RECV
3107  if (dist_nrow_local[p] > 0)
3108  {
3109  // types
3110  MPI_Datatype types[3];
3111 
3112  // offsets
3113  MPI_Aint offsets[3];
3114 
3115  // lengths
3116  int len[3];
3117 
3118  // row start
3119  MPI_Type_contiguous(dist_nrow_local[p], MPI_INT, &types[0]);
3120  MPI_Type_commit(&types[0]);
3121  MPI_Get_address(global_row_start + dist_first_row[p],
3122  &offsets[0]);
3123  offsets[0] -= base_address;
3124  len[0] = 1;
3125 
3126  // value
3127  MPI_Type_contiguous(dist_nnz_pt[p], MPI_DOUBLE, &types[1]);
3128  MPI_Type_commit(&types[1]);
3129  MPI_Get_address(global_value + nnz_offset[p], &offsets[1]);
3130  offsets[1] -= base_address;
3131  len[1] = 1;
3132 
3133  // column indices
3134  MPI_Type_contiguous(dist_nnz_pt[p], MPI_INT, &types[2]);
3135  MPI_Type_commit(&types[2]);
3136  MPI_Get_address(global_column_index + nnz_offset[p], &offsets[2]);
3137  offsets[2] -= base_address;
3138  len[2] = 1;
3139 
3140  // build the send type
3141  MPI_Datatype recv_type;
3142  MPI_Type_create_struct(3, len, offsets, types, &recv_type);
3143  MPI_Type_commit(&recv_type);
3144  MPI_Type_free(&types[0]);
3145  MPI_Type_free(&types[1]);
3146  MPI_Type_free(&types[2]);
3147 
3148  // and send
3149  MPI_Request req;
3150  MPI_Irecv(global_value,
3151  1,
3152  recv_type,
3153  p,
3154  1,
3155  this->distribution_pt()->communicator_pt()->mpi_comm(),
3156  &req);
3157  recv_req.push_back(req);
3158  MPI_Type_free(&recv_type);
3159  }
3160  }
3161  // otherwise send to self
3162  else
3163  {
3164  unsigned nrow_local = dist_nrow_local[my_rank];
3165  unsigned first_row = dist_first_row[my_rank];
3166  for (unsigned i = 0; i < nrow_local; i++)
3167  {
3168  global_row_start[first_row + i] = dist_row_start[i];
3169  }
3170  unsigned offset = nnz_offset[my_rank];
3171  for (int i = 0; i < nnz; i++)
3172  {
3173  global_value[offset + i] = dist_value[i];
3174  global_column_index[offset + i] = dist_column_index[i];
3175  }
3176  }
3177  }
3178 
3179  // wait for all recvs to complete
3180  unsigned n_recv_req = recv_req.size();
3181  if (n_recv_req > 0)
3182  {
3183  Vector<MPI_Status> recv_status(n_recv_req);
3184  MPI_Waitall(n_recv_req, &recv_req[0], &recv_status[0]);
3185  }
3186 
3187  // finally the last row start
3188  global_row_start[nrow] = nnz_count;
3189 
3190  // update the other row start
3191  for (int p = 0; p < nproc; p++)
3192  {
3193  for (int i = 0; i < dist_nrow_local[p]; i++)
3194  {
3195  unsigned j = dist_first_row[p] + i;
3196  global_row_start[j] += nnz_offset[p];
3197  }
3198  }
3199 
3200  // wait for sends to complete
3201  unsigned n_send_req = send_req.size();
3202  if (n_recv_req > 0)
3203  {
3204  Vector<MPI_Status> send_status(n_recv_req);
3205  MPI_Waitall(n_send_req, &send_req[0], &send_status[0]);
3206  }
3207 
3208  // rebuild the matrix
3210  this->distribution_pt()->communicator_pt(), nrow, false);
3211  this->build(dist_pt);
3212  this->build_without_copy(
3213  ncol, nnz_count, global_value, global_column_index, global_row_start);
3214 
3215  // clean up
3216  delete dist_pt;
3217  delete[] dist_first_row;
3218  delete[] dist_nrow_local;
3219  delete[] dist_nnz_pt;
3220  }
3221 
3222  // other the matrix is not distributed but it needs to be turned
3223  // into a distributed matrix
3224  // =============================================================
3225  else if (!this->distributed() && dist_pt->distributed())
3226  {
3227  // cache the new nrow_local
3228  unsigned nrow_local = dist_pt->nrow_local();
3229 
3230  // and first_row
3231  unsigned first_row = dist_pt->first_row();
3232 
3233  // get pointers to the (current) distributed data
3234  int* global_row_start = this->row_start();
3235  int* global_column_index = this->column_index();
3236  double* global_value = this->value();
3237 
3238  // determine the number of non zeros required by this processor
3239  unsigned nnz = global_row_start[first_row + nrow_local] -
3240  global_row_start[first_row];
3241 
3242  // allocate
3243  int* dist_row_start = new int[nrow_local + 1];
3244  int* dist_column_index = new int[nnz];
3245  double* dist_value = new double[nnz];
3246 
3247  // copy
3248  int offset = global_row_start[first_row];
3249  for (unsigned i = 0; i <= nrow_local; i++)
3250  {
3251  dist_row_start[i] = global_row_start[first_row + i] - offset;
3252  }
3253  for (unsigned i = 0; i < nnz; i++)
3254  {
3255  dist_column_index[i] = global_column_index[offset + i];
3256  dist_value[i] = global_value[offset + i];
3257  }
3258 
3259  // rebuild
3260  this->build(dist_pt);
3261  this->build_without_copy(
3262  ncol, nnz, dist_value, dist_column_index, dist_row_start);
3263  }
3264  }
3265 #endif
3266  }
3267 
3268  //=============================================================================
3269  /// Compute transpose of matrix
3270  //=============================================================================
3272  {
3273  // Get the number of non_zeros
3274  unsigned long nnon_zeros = this->nnz();
3275 
3276  // Find the number of rows and columns in the transposed
3277  // matrix. We differentiate these from those associated
3278  // with the original matrix by appending the characters
3279  // '_t' onto the end
3280  unsigned long n_rows = this->nrow();
3281  unsigned long n_rows_t = this->ncol();
3282 
3283 #ifdef OOMPH_HAS_MPI
3284  // We only need to produce a warning if the matrix is distributed
3285  if (this->distributed())
3286  {
3287  // Create an ostringstream object to store the warning message
3288  std::ostringstream warning_message;
3289 
3290  // Create the warning messsage
3291  warning_message << "This method currently works for serial but "
3292  << "has not been tested with MPI!\n";
3293 
3294  // Issue the warning
3295  OomphLibWarning(warning_message.str(),
3296  OOMPH_CURRENT_FUNCTION,
3297  OOMPH_EXCEPTION_LOCATION);
3298  }
3299 #endif
3300 
3301  // Set up the distribution for the transposed matrix
3302  result->distribution_pt()->build(
3303  this->distribution_pt()->communicator_pt(), n_rows_t, false);
3304 
3305  // Acquire access to the value, row_start and column_index
3306  // arrays from the CR matrix
3307  const double* value_pt = this->value();
3308  const int* row_start_pt = this->row_start();
3309  const int* column_index_pt = this->column_index();
3310 
3311  // Allocate space for the row_start and column_index vectors
3312  // associated with the transpose of the current matrix.
3313  Vector<double> value_t(nnon_zeros, 0.0);
3314  Vector<int> column_index_t(nnon_zeros, 0);
3315  Vector<int> row_start_t(n_rows_t + 1, 0);
3316 
3317  // Loop over the column index vector and count how many times
3318  // each column number occurs and increment the appropriate
3319  // entry in row_start_t whose i+1'th entry will contain the
3320  // number of non-zeros in the i'th column of the matrix (this
3321  // is done so that the cumulative sum done next returns the
3322  // correct result)
3323  for (unsigned i = 0; i < nnon_zeros; i++)
3324  {
3325  // Assign entries to row_start_t (noting the first
3326  // entry is left as 0 for the cumulative sum done later)
3327  row_start_t[*(column_index_pt + i) + 1]++;
3328  }
3329 
3330  // Calculate the sum of the first i entries in the row_start_t
3331  // vector and store the values in the i'th entry of row_start_t
3332  for (unsigned i = 1; i < n_rows_t + 1; i++)
3333  {
3334  // Calculate the cumulative sum
3335  row_start_t[i] += row_start_t[i - 1];
3336  }
3337 
3338  // Allocate space for variables to store the indices of the
3339  // start and end of the non-zeros in a given row of the matrix
3340  unsigned i_row_s = 0;
3341  unsigned i_row_e = 0;
3342 
3343  // Initialise 3 extra variables for readability of the
3344  // code in the subsequent piece of code
3345  unsigned a = 0;
3346  unsigned b = 0;
3347  unsigned c = 0;
3348 
3349  // Vector needed to count the number of entries added
3350  // to each segment in column_index_t where each segment
3351  // is associated with each row in the transposed matrix
3352  Vector<int> counter(n_rows_t, 0);
3353 
3354  // Set the entries in column_index_t. To do this we loop
3355  // over each row of the original matrix (equivalently
3356  // the number of columns in the transpose)
3357  for (unsigned i_row = 0; i_row < n_rows; i_row++)
3358  {
3359  // Here we find the indices of the start and end
3360  // of the non-zeros in i_row'th row of the matrix.
3361  // [Note, there should be a -1 on i_row_e but this
3362  // is ignored so that we use a strict inequality
3363  // in the subsequent for-loop!]
3364  i_row_s = *(row_start_pt + i_row);
3365  i_row_e = *(row_start_pt + i_row + 1);
3366 
3367  // Loop over the entries in the i_row'th row
3368  // of the matrix
3369  for (unsigned j = i_row_s; j < i_row_e; j++)
3370  {
3371  // The value of a is the column index of the j'th
3372  // element in the i_row'th row of the original matrix
3373  // (which is also the row index of the associated
3374  // non-zero in the transposed matrix)
3375  a = *(column_index_pt + j);
3376 
3377  // The value of b will be used to find the start
3378  // of the appropriate segment of column_index_t
3379  // that the non-zero belongs in
3380  b = row_start_t[a];
3381 
3382  // Find the number of elements already added to
3383  // this segment (to find which entry of the segment
3384  // the value i_row, the column index of the non-zero
3385  // in the transposed matrix, should be assigned to)
3386  c = counter[*(column_index_pt + j)];
3387 
3388  // Assign the value i_row to the appropriate entry
3389  // in column_index_t
3390  column_index_t[b + c] = i_row;
3391  value_t[b + c] = *(value_pt + j);
3392 
3393  // Increment the j'th value of the vector counter
3394  // to indicate another non-zero index has been
3395  // added into the
3396  counter[*(column_index_pt + j)]++;
3397 
3398  } // End of for-loop over i_row'th row of the matrix
3399 
3400  } // End of for-loop over rows of the matrix
3401 
3402  // Build the matrix (note: the value of n_cols for the
3403  // transposed matrix is n_rows for the original matrix)
3404  result->build(n_rows, value_t, column_index_t, row_start_t);
3405 
3406  } // End of the function
3407 
3408 
3409  //=============================================================================
3410  /// Compute infinity (maximum) norm of matrix
3411  //=============================================================================
3413  {
3414 #ifdef PARANOID
3415  // paranoid check that the vector is setup
3416  if (!this->distribution_built())
3417  {
3418  std::ostringstream error_message;
3419  error_message << "This matrix must be setup.";
3420  throw OomphLibError(
3421  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3422  }
3423 #endif
3424 
3425  // compute the local norm
3426  unsigned nrow_local = this->nrow_local();
3427  double n = 0;
3428  const int* row_start = CR_matrix.row_start();
3429  const double* value = CR_matrix.value();
3430  for (unsigned i = 0; i < nrow_local; i++)
3431  {
3432  double a = 0;
3433  for (int j = row_start[i]; j < row_start[i + 1]; j++)
3434  {
3435  a += fabs(value[j]);
3436  }
3437  n = std::max(n, a);
3438  }
3439 
3440  // if this vector is distributed and on multiple processors then gather
3441 #ifdef OOMPH_HAS_MPI
3442  double n2 = n;
3443  if (this->distributed() &&
3444  this->distribution_pt()->communicator_pt()->nproc() > 1)
3445  {
3446  MPI_Allreduce(&n,
3447  &n2,
3448  1,
3449  MPI_DOUBLE,
3450  MPI_MAX,
3451  this->distribution_pt()->communicator_pt()->mpi_comm());
3452  }
3453  n = n2;
3454 #endif
3455 
3456  // and return
3457  return n;
3458  }
3459 
3460  //=============================================================================
3461  /// Return the diagonal entries of the matrix.
3462  /// This only works with square matrices. This condition may be relaxed
3463  /// in the future if need be.
3464  //=============================================================================
3466  {
3467 #ifdef PARANOID
3468  // Check if the matrix has been built.
3469  if (!this->built())
3470  {
3471  std::ostringstream error_message;
3472  error_message << "The matrix has not been built.\n"
3473  << "Please build it...\n";
3474  throw OomphLibError(
3475  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3476  }
3477 
3478  // Check if this is a square matrix.
3479  if (this->nrow() != this->ncol())
3480  {
3481  std::ostringstream error_message;
3482  error_message << "The matrix is not square. Can only get the diagonal\n"
3483  << "entries of a square matrix.\n";
3484  throw OomphLibError(
3485  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3486  }
3487 #endif
3488 
3489  // We get the diagonal entries on this processor.
3490  unsigned nrow_local = this->nrow_local();
3491 
3492  // Create storage for the diagonal entries.
3493  Vector<double> result_vec;
3494  result_vec.reserve(nrow_local);
3495 
3496  // Get the first row for the column offset.
3497  unsigned first_row = this->first_row();
3498 
3499  // Loop through the local rows.
3500  for (unsigned i = 0; i < nrow_local; i++)
3501  {
3502  // The column entries are globally indexed.
3503  unsigned diag_entry_col = first_row + i;
3504 
3505  // Push back the diagonal entry.
3506  result_vec.push_back(CR_matrix.get_entry(i, diag_entry_col));
3507  }
3508 
3509  return result_vec;
3510  }
3511 
3512  //=============================================================================
3513  /// Element-wise addition of this matrix with matrix_in.
3514  //=============================================================================
3515  void CRDoubleMatrix::add(const CRDoubleMatrix& matrix_in,
3516  CRDoubleMatrix& result_matrix) const
3517  {
3518 #ifdef PARANOID
3519  // Check if this matrix is built.
3520  if (!this->built())
3521  {
3522  std::ostringstream error_message;
3523  error_message << "The matrix is not built.\n"
3524  << "Please build the matrix!\n";
3525  throw OomphLibError(
3526  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3527  }
3528 
3529  // Check if this matrix_in is built.
3530  if (!matrix_in.built())
3531  {
3532  std::ostringstream error_message;
3533  error_message << "The matrix matrix_in is not built.\n"
3534  << "Please build the matrix!\n";
3535  throw OomphLibError(
3536  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3537  }
3538 
3539  // Check if the dimensions of this matrix and matrix_in are the same.
3540  unsigned long this_nrow = this->nrow();
3541  unsigned long matrix_in_nrow = matrix_in.nrow();
3542  if (this_nrow != matrix_in_nrow)
3543  {
3544  std::ostringstream error_message;
3545  error_message << "matrix_in has a different number of rows than\n"
3546  << "this matrix.\n";
3547  throw OomphLibError(
3548  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3549  }
3550 
3551  unsigned long this_ncol = this->ncol();
3552  unsigned long matrix_in_ncol = matrix_in.ncol();
3553  if (this_ncol != matrix_in_ncol)
3554  {
3555  std::ostringstream error_message;
3556  error_message << "matrix_in has a different number of columns than\n"
3557  << "this matrix.\n";
3558  throw OomphLibError(
3559  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3560  }
3561 
3562  // Check if the distribution is the same (Otherwise we may have to send and
3563  // receive data from other processors - which is not implemented!)
3564  if (*(this->distribution_pt()) != *(matrix_in.distribution_pt()))
3565  {
3566  std::ostringstream error_message;
3567  error_message << "matrix_in must have the same distribution as\n"
3568  << "this matrix.\n";
3569  throw OomphLibError(
3570  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3571  }
3572 
3573  // If the matrix is built, check that it's existing distribution is the
3574  // same as the in matrix. Since we'll use the same distribution instead
3575  // of completely rebuilding it.
3576  if (result_matrix.built() &&
3577  (*result_matrix.distribution_pt() != *matrix_in.distribution_pt()))
3578  {
3579  std::ostringstream error_message;
3580  error_message << "The result_matrix is built. "
3581  << "But has a different distribution from matrix_in \n"
3582  << "They need to be the same.\n";
3583  throw OomphLibError(
3584  error_message.str(), OOMPH_CURRENT_FUNCTION, OOMPH_EXCEPTION_LOCATION);
3585  }
3586 #endif
3587 
3588  // To add the elements of two CRDoubleMatrices, we need to know the union of
3589  // the sparsity patterns. This is determined by the column indices.
3590  // We add the column indices and values (entries) as a key-value pair in
3591  // a map (per row). We then read these out into two column indices and
3592  // values vector for the result matrix.
3593 
3594  unsigned nrow_local = this->nrow_local();
3595  Vector<int> res_column_indices;
3596  Vector<double> res_values;
3597  Vector<int> res_row_start;
3598  res_row_start.reserve(nrow_local + 1);
3599 
3600  // The row_start and column_indices
3601  const int* this_column_indices = this->column_index();
3602  const int* this_row_start = this->row_start();
3603  const int* in_column_indices = matrix_in.column_index();
3604  const int* in_row_start = matrix_in.row_start();
3605 
3606  // Values from this matrix and matrix_in.
3607  const double* this_values = this->value();
3608  const double* in_values = matrix_in.value();
3609 
3610 
3611  // The first entry in row_start is always zero.
3612  res_row_start.push_back(0);
3613 
3614  // Loop through the rows of both matrices and insert the column indices and
3615  // values as a key-value pair.
3616  for (unsigned row_i = 0; row_i < nrow_local; row_i++)
3617  {
3618  // Create the map for this row.
3619  std::map<int, double> res_row_map;
3620 
3621  // Insert the column and value pair for this matrix.
3622  for (int i = this_row_start[row_i]; i < this_row_start[row_i + 1]; i++)
3623  {
3624  res_row_map[this_column_indices[i]] = this_values[i];
3625  }
3626 
3627  // Insert the column and value pair for in matrix.
3628  for (int i = in_row_start[row_i]; i < in_row_start[row_i + 1]; i++)
3629  {
3630  res_row_map[in_column_indices[i]] += in_values[i];
3631  }
3632 
3633  // Fill in the row start
3634  res_row_start.push_back(res_row_start.back() + res_row_map.size());
3635 
3636  // Now insert the key into res_column_indices and value into res_values
3637  for (std::map<int, double>::iterator it = res_row_map.begin();
3638  it != res_row_map.end();
3639  ++it)
3640  {
3641  res_column_indices.push_back(it->first);
3642  res_values.push_back(it->second);
3643  }
3644  }
3645 
3646  // Finally build the result_matrix.
3647  if (result_matrix.distribution_pt()->built())
3648  {
3649  // Use the existing distribution.
3650  result_matrix.build(
3651  this->ncol(), res_values, res_column_indices, res_row_start);
3652  }
3653  else
3654  {
3655  // Build with THIS distribution
3656  result_matrix.build(this->distribution_pt(),
3657  this->ncol(),
3658  res_values,
3659  res_column_indices,
3660  res_row_start);
3661  }
3662  }
3663 
3664  //=================================================================
3665  /// Namespace for helper functions for CRDoubleMatrices
3666  //=================================================================
3667  namespace CRDoubleMatrixHelpers
3668  {
3669  //============================================================================
3670  /// Builds a uniformly distributed matrix.
3671  /// A locally replicated matrix is constructed then redistributed using
3672  /// OOMPH-LIB's default uniform row distribution.
3673  /// This is memory intensive thus should be used for
3674  /// testing or small problems only.
3675  //============================================================================
3677  const unsigned& nrow,
3678  const unsigned& ncol,
3679  const OomphCommunicator* const comm_pt,
3680  const Vector<double>& values,
3681  const Vector<int>& column_indices,
3682  const Vector<int>& row_start,
3683  CRDoubleMatrix& matrix_out)
3684  {
3685 #ifdef PARANOID
3686  // Check if the communicator exists.
3687  if (comm_pt == 0)
3688  {
3689  std::ostringstream error_message;
3690  error_message << "Please supply the communicator.\n";
3691  throw OomphLibError(error_message.str(),
3692  OOMPH_CURRENT_FUNCTION,
3693  OOMPH_EXCEPTION_LOCATION);
3694  }
3695  // Is the out matrix built? We need an empty matrix!
3696  if (matrix_out.built())
3697  {
3698  std::ostringstream error_message;
3699  error_message << "The result matrix has been built.\n"
3700  << "Please clear the matrix.\n";
3701  throw OomphLibError(error_message.str(),
3702  OOMPH_CURRENT_FUNCTION,
3703  OOMPH_EXCEPTION_LOCATION);
3704  }
3705 #endif
3706 
3707  // Create the locally replicated distribution.
3708  bool distributed = false;
3709  LinearAlgebraDistribution locally_replicated_distribution(
3710  comm_pt, nrow, distributed);
3711 
3712  // Create the matrix.
3713  matrix_out.build(&locally_replicated_distribution,
3714  ncol,
3715  values,
3716  column_indices,
3717  row_start);
3718 
3719  // Create the distributed distribution.
3720  distributed = true;
3721  LinearAlgebraDistribution distributed_distribution(
3722  comm_pt, nrow, distributed);
3723 
3724  // Redistribute the matrix.
3725  matrix_out.redistribute(&distributed_distribution);
3726  }
3727 
3728  //============================================================================
3729  /// Compute infinity (maximum) norm of sub blocks as if it was one matrix
3730  //============================================================================
3731  double inf_norm(const DenseMatrix<CRDoubleMatrix*>& matrix_pt)
3732  {
3733  // The number of block rows and columns
3734  const unsigned nblockrow = matrix_pt.nrow();
3735  const unsigned nblockcol = matrix_pt.ncol();
3736 
3737 #ifdef PARANOID
3738  // Check that tehre is at least one matrix.
3739  if (matrix_pt.nrow() == 0)
3740  {
3741  std::ostringstream error_message;
3742  error_message << "There are no matrices... \n";
3743  throw OomphLibError(error_message.str(),
3744  OOMPH_CURRENT_FUNCTION,
3745  OOMPH_EXCEPTION_LOCATION);
3746  }
3747 
3748 
3749  // Check that all matrix_pt pointers are not null
3750  // and the matrices are built.
3751  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3752  {
3753  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3754  {
3755  if (matrix_pt(block_row_i, block_col_i) == 0)
3756  {
3757  std::ostringstream error_message;
3758  error_message << "The pointer martrix_pt(" << block_row_i << ","
3759  << block_col_i << ") is null.\n";
3760  throw OomphLibError(error_message.str(),
3761  OOMPH_CURRENT_FUNCTION,
3762  OOMPH_EXCEPTION_LOCATION);
3763  }
3764 
3765  if (!matrix_pt(block_row_i, block_col_i)->built())
3766  {
3767  std::ostringstream error_message;
3768  error_message << "The matrix at martrix_pt(" << block_row_i << ","
3769  << block_col_i << ") is not built.\n";
3770  throw OomphLibError(error_message.str(),
3771  OOMPH_CURRENT_FUNCTION,
3772  OOMPH_EXCEPTION_LOCATION);
3773  }
3774  }
3775  }
3776 #endif
3777 
3778 #ifdef OOMPH_HAS_MPI
3779 
3780  // The communicator pointer from block (0,0)
3781  const OomphCommunicator* const comm_pt =
3782  matrix_pt(0, 0)->distribution_pt()->communicator_pt();
3783 
3784 #ifdef PARANOID
3785 
3786 
3787  // Check that all communicators are the same
3788  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3789  {
3790  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3791  {
3792  // Communicator for this block matrix.
3793  const OomphCommunicator current_block_comm =
3794  *(matrix_pt(block_row_i, block_col_i)
3795  ->distribution_pt()
3796  ->communicator_pt());
3797  if (*comm_pt != current_block_comm)
3798  {
3799  std::ostringstream error_message;
3800  error_message << "The communicator of block martrix_pt("
3801  << block_row_i << "," << block_col_i
3802  << ") is not the same as block "
3803  << "matrix_pt(0,0).\n";
3804  throw OomphLibError(error_message.str(),
3805  OOMPH_CURRENT_FUNCTION,
3806  OOMPH_EXCEPTION_LOCATION);
3807  }
3808  }
3809  }
3810 
3811  // Check that all distributed boolean are the same (if on more than 1
3812  // core)
3813  if (comm_pt->nproc() > 1)
3814  {
3815  // Get the distributed boolean from matrix_pt(0,0)
3816  bool first_distributed = matrix_pt(0, 0)->distributed();
3817 
3818  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3819  {
3820  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3821  {
3822  // Is the current block distributed?
3823  bool current_distributed =
3824  matrix_pt(block_row_i, block_col_i)->distributed();
3825 
3826  if (first_distributed != current_distributed)
3827  {
3828  std::ostringstream error_message;
3829  error_message << "Block matrix_pt(" << block_row_i << ","
3830  << block_col_i << ") and block matrix_pt(0,0) "
3831  << "have a different distributed boolean.\n";
3832  throw OomphLibError(error_message.str(),
3833  OOMPH_CURRENT_FUNCTION,
3834  OOMPH_EXCEPTION_LOCATION);
3835  }
3836  }
3837  }
3838  }
3839 
3840  // Check that all sub matrix dimensions "make sense"
3841  // We need to check that all the matrices in the same row has the same
3842  // nrow. Then repeat for the columns.
3843 
3844  // Check the nrow of each block row.
3845  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3846  {
3847  // Get the nrow to compare against from the first column.
3848  const unsigned first_block_nrow = matrix_pt(block_row_i, 0)->nrow();
3849 
3850  // Loop through the block columns.
3851  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
3852  {
3853  // If the nrow of this block is not the same as the nrow from the
3854  // first block in this block row, throw an error.
3855  const unsigned current_block_nrow =
3856  matrix_pt(block_row_i, block_col_i)->nrow();
3857 
3858  if (first_block_nrow != current_block_nrow)
3859  {
3860  std::ostringstream error_message;
3861  error_message << "First block has nrow = " << current_block_nrow
3862  << ". But martrix_pt(" << block_row_i << ","
3863  << block_col_i
3864  << ") has nrow = " << current_block_nrow << ".\n";
3865  throw OomphLibError(error_message.str(),
3866  OOMPH_CURRENT_FUNCTION,
3867  OOMPH_EXCEPTION_LOCATION);
3868  }
3869  }
3870  }
3871 
3872  // Check the ncol of each block column.
3873  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3874  {
3875  // Get the ncol from the first block row to compare against.
3876  const unsigned first_block_ncol = matrix_pt(0, block_col_i)->ncol();
3877 
3878  for (unsigned block_row_i = 1; block_row_i < nblockrow; block_row_i++)
3879  {
3880  // Get the ncol for the current block.
3881  const unsigned current_block_ncol =
3882  matrix_pt(block_row_i, block_col_i)->ncol();
3883 
3884  if (first_block_ncol != current_block_ncol)
3885  {
3886  std::ostringstream error_message;
3887  error_message << "First block has ncol = " << current_block_ncol
3888  << ". But martrix_pt(" << block_row_i << ","
3889  << block_col_i
3890  << ") has ncol = " << current_block_ncol << ".\n";
3891  throw OomphLibError(error_message.str(),
3892  OOMPH_CURRENT_FUNCTION,
3893  OOMPH_EXCEPTION_LOCATION);
3894  }
3895  }
3896  }
3897 
3898  // Check that the distribution for each block row is the same.
3899  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3900  {
3901  // The first distribution of this block row.
3902  const LinearAlgebraDistribution first_dist =
3903  *(matrix_pt(block_row_i, 0)->distribution_pt());
3904 
3905  // Loop through the rest of the block columns.
3906  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
3907  {
3908  // Get the distribution from the current block.
3909  const LinearAlgebraDistribution current_dist =
3910  matrix_pt(block_row_i, block_col_i)->distribution_pt();
3911 
3912  // Compare the first distribution against the current.
3913  if (first_dist != current_dist)
3914  {
3915  std::ostringstream error_message;
3916  error_message << "First distribution of block row " << block_row_i
3917  << " is different from the distribution from "
3918  << "martrix_pt(" << block_row_i << "," << block_col_i
3919  << ").\n";
3920  throw OomphLibError(error_message.str(),
3921  OOMPH_CURRENT_FUNCTION,
3922  OOMPH_EXCEPTION_LOCATION);
3923  }
3924  }
3925  }
3926 #endif
3927 
3928 #endif
3929 
3930  // Loop thrpugh the block rows, then block columns to
3931  // compute the local inf norm
3932  double inf_norm = 0;
3933  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
3934  {
3935  // Get the number of local rows from the first block.
3936  unsigned block_nrow_local = matrix_pt(block_row_i, 0)->nrow_local();
3937 
3938  // Loop through the block_nrow_local in this block row
3939  for (unsigned local_row_i = 0; local_row_i < block_nrow_local;
3940  local_row_i++)
3941  {
3942  double abs_sum_of_row = 0;
3943  // Loop through the block columns
3944  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
3945  {
3946  // Locally cache the pointer to the current block.
3947  CRDoubleMatrix* block_pt = matrix_pt(block_row_i, block_col_i);
3948 
3949  const int* row_start = block_pt->row_start();
3950  const double* value = block_pt->value();
3951 
3952  // Loop through the values
3953  for (int val_i = row_start[local_row_i];
3954  val_i < row_start[local_row_i + 1];
3955  val_i++)
3956  {
3957  abs_sum_of_row += fabs(value[val_i]);
3958  }
3959  }
3960  // Store the max row
3961  inf_norm = std::max(inf_norm, abs_sum_of_row);
3962  }
3963  }
3964 
3965  // if this vector is distributed and on multiple processors then gather
3966 #ifdef OOMPH_HAS_MPI
3967  double inf_norm_local = inf_norm;
3968  if (matrix_pt(0, 0)->distributed() && comm_pt->nproc() > 1)
3969  {
3970  MPI_Allreduce(&inf_norm,
3971  &inf_norm_local,
3972  1,
3973  MPI_DOUBLE,
3974  MPI_MAX,
3975  comm_pt->mpi_comm());
3976  }
3977  inf_norm = inf_norm_local;
3978 #endif
3979 
3980  // and return
3981  return inf_norm;
3982  }
3983 
3984  //============================================================================
3985  /// Calculates the largest Gershgorin disc whilst preserving the sign. Let
3986  /// A be an n by n matrix, with entries aij. For \f$ i \in \{ 1,...,n \} \f$
3987  /// let \f$ R_i = \sum_{i\neq j} |a_{ij}| \f$ be the sum of the absolute
3988  /// values of the non-diagonal entries in the i-th row. Let \f$ D(a_{ii},R_i) \f$
3989  /// be the closed disc centered at \f$ a_{ii} \f$ with radius \f$ R_i \f$,
3990  /// such a disc is called a Gershgorin disc.
3991  ///
3992  /// \n
3993  ///
3994  /// We calculate \f$ |D(a_{ii},R_i)|_{max} \f$ and multiply by the sign of
3995  /// the diagonal entry.
3996  ///
3997  /// \n
3998  ///
3999  /// The DenseMatrix of CRDoubleMatrices are treated as if they are one
4000  /// large matrix. Therefore the dimensions of the sub matrices has to
4001  /// "make sense", there is a paranoid check for this.
4002  //============================================================================
4004  const DenseMatrix<CRDoubleMatrix*>& matrix_pt)
4005  {
4006  // The number of block rows and columns
4007  const unsigned nblockrow = matrix_pt.nrow();
4008  const unsigned nblockcol = matrix_pt.ncol();
4009 
4010 #ifdef PARANOID
4011  // Check that tehre is at least one matrix.
4012  if (matrix_pt.nrow() == 0)
4013  {
4014  std::ostringstream error_message;
4015  error_message << "There are no matrices... \n";
4016  throw OomphLibError(error_message.str(),
4017  OOMPH_CURRENT_FUNCTION,
4018  OOMPH_EXCEPTION_LOCATION);
4019  }
4020 
4021 
4022  // Check that all matrix_pt pointers are not null
4023  // and the matrices are built.
4024  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4025  {
4026  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4027  {
4028  if (matrix_pt(block_row_i, block_col_i) == 0)
4029  {
4030  std::ostringstream error_message;
4031  error_message << "The pointer martrix_pt(" << block_row_i << ","
4032  << block_col_i << ") is null.\n";
4033  throw OomphLibError(error_message.str(),
4034  OOMPH_CURRENT_FUNCTION,
4035  OOMPH_EXCEPTION_LOCATION);
4036  }
4037 
4038  if (!matrix_pt(block_row_i, block_col_i)->built())
4039  {
4040  std::ostringstream error_message;
4041  error_message << "The matrix at martrix_pt(" << block_row_i << ","
4042  << block_col_i << ") is not built.\n";
4043  throw OomphLibError(error_message.str(),
4044  OOMPH_CURRENT_FUNCTION,
4045  OOMPH_EXCEPTION_LOCATION);
4046  }
4047  }
4048  }
4049 #endif
4050 
4051 
4052 #ifdef OOMPH_HAS_MPI
4053 
4054  // The communicator pointer from block (0,0)
4055  // All communicators should be the same, we check this next.
4056  const OomphCommunicator* const comm_pt =
4057  matrix_pt(0, 0)->distribution_pt()->communicator_pt();
4058 
4059 #ifdef PARANOID
4060 
4061  // Check that all communicators are the same
4062  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4063  {
4064  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4065  {
4066  // Communicator for this block matrix.
4067  const OomphCommunicator current_block_comm =
4068  *(matrix_pt(block_row_i, block_col_i)
4069  ->distribution_pt()
4070  ->communicator_pt());
4071  if (*comm_pt != current_block_comm)
4072  {
4073  std::ostringstream error_message;
4074  error_message << "The communicator of block martrix_pt("
4075  << block_row_i << "," << block_col_i
4076  << ") is not the same as block "
4077  << "matrix_pt(0,0).\n";
4078  throw OomphLibError(error_message.str(),
4079  OOMPH_CURRENT_FUNCTION,
4080  OOMPH_EXCEPTION_LOCATION);
4081  }
4082  }
4083  }
4084 
4085  // Check that all distributed boolean are the same (if on more than 1
4086  // core)
4087  if (comm_pt->nproc() > 1)
4088  {
4089  // Get the distributed boolean from matrix_pt(0,0)
4090  bool first_distributed = matrix_pt(0, 0)->distributed();
4091 
4092  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4093  {
4094  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4095  {
4096  // Is the current block distributed?
4097  bool current_distributed =
4098  matrix_pt(block_row_i, block_col_i)->distributed();
4099 
4100  if (first_distributed != current_distributed)
4101  {
4102  std::ostringstream error_message;
4103  error_message << "Block matrix_pt(" << block_row_i << ","
4104  << block_col_i << ") and block matrix_pt(0,0) "
4105  << "have a different distributed boolean.\n";
4106  throw OomphLibError(error_message.str(),
4107  OOMPH_CURRENT_FUNCTION,
4108  OOMPH_EXCEPTION_LOCATION);
4109  }
4110  }
4111  }
4112  }
4113 
4114  // Check that all sub matrix dimensions "make sense"
4115  // We need to check that all the matrices in the same row has the same
4116  // nrow. Then repeat for the columns.
4117 
4118  // Check the nrow of each block row.
4119  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4120  {
4121  // Get the nrow to compare against from the first column.
4122  const unsigned first_block_nrow = matrix_pt(block_row_i, 0)->nrow();
4123 
4124  // Loop through the block columns.
4125  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
4126  {
4127  // If the nrow of this block is not the same as the nrow from the
4128  // first block in this block row, throw an error.
4129  const unsigned current_block_nrow =
4130  matrix_pt(block_row_i, block_col_i)->nrow();
4131 
4132  if (first_block_nrow != current_block_nrow)
4133  {
4134  std::ostringstream error_message;
4135  error_message << "First block has nrow = " << current_block_nrow
4136  << ". But martrix_pt(" << block_row_i << ","
4137  << block_col_i
4138  << ") has nrow = " << current_block_nrow << ".\n";
4139  throw OomphLibError(error_message.str(),
4140  OOMPH_CURRENT_FUNCTION,
4141  OOMPH_EXCEPTION_LOCATION);
4142  }
4143  }
4144  }
4145 
4146  // Check the ncol of each block column.
4147  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4148  {
4149  // Get the ncol from the first block row to compare against.
4150  const unsigned first_block_ncol = matrix_pt(0, block_col_i)->ncol();
4151 
4152  for (unsigned block_row_i = 1; block_row_i < nblockrow; block_row_i++)
4153  {
4154  // Get the ncol for the current block.
4155  const unsigned current_block_ncol =
4156  matrix_pt(block_row_i, block_col_i)->ncol();
4157 
4158  if (first_block_ncol != current_block_ncol)
4159  {
4160  std::ostringstream error_message;
4161  error_message << "First block has ncol = " << current_block_ncol
4162  << ". But martrix_pt(" << block_row_i << ","
4163  << block_col_i
4164  << ") has ncol = " << current_block_ncol << ".\n";
4165  throw OomphLibError(error_message.str(),
4166  OOMPH_CURRENT_FUNCTION,
4167  OOMPH_EXCEPTION_LOCATION);
4168  }
4169  }
4170  }
4171 
4172  // Check that the distribution for each block row is the same.
4173  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4174  {
4175  // The first distribution of this block row.
4176  const LinearAlgebraDistribution first_dist =
4177  *(matrix_pt(block_row_i, 0)->distribution_pt());
4178 
4179  // Loop through the rest of the block columns.
4180  for (unsigned block_col_i = 1; block_col_i < nblockcol; block_col_i++)
4181  {
4182  // Get the distribution from the current block.
4183  const LinearAlgebraDistribution current_dist =
4184  matrix_pt(block_row_i, block_col_i)->distribution_pt();
4185 
4186  // Compare the first distribution against the current.
4187  if (first_dist != current_dist)
4188  {
4189  std::ostringstream error_message;
4190  error_message << "First distribution of block row " << block_row_i
4191  << " is different from the distribution from "
4192  << "martrix_pt(" << block_row_i << "," << block_col_i
4193  << ").\n";
4194  throw OomphLibError(error_message.str(),
4195  OOMPH_CURRENT_FUNCTION,
4196  OOMPH_EXCEPTION_LOCATION);
4197  }
4198  }
4199  }
4200 
4201 #endif
4202 #endif
4203 
4204  // Loop thrpugh the block rows, then block columns to
4205  // compute the local inf norm
4206  double extreme_disc = 0;
4207  for (unsigned block_row_i = 0; block_row_i < nblockrow; block_row_i++)
4208  {
4209  // Get the number of local rows from the first block.
4210  unsigned block_nrow_local = matrix_pt(block_row_i, 0)->nrow_local();
4211 
4212  // Loop through the block_nrow_local in this block row
4213  for (unsigned local_row_i = 0; local_row_i < block_nrow_local;
4214  local_row_i++)
4215  {
4216  double abs_sum_of_row = 0;
4217  // Loop through the block columns
4218  for (unsigned block_col_i = 0; block_col_i < nblockcol; block_col_i++)
4219  {
4220  // Locally cache the pointer to the current block.
4221  CRDoubleMatrix* block_pt = matrix_pt(block_row_i, block_col_i);
4222 
4223  const int* row_start = block_pt->row_start();
4224  const double* value = block_pt->value();
4225 
4226  // Loop through the values
4227  for (int val_i = row_start[local_row_i];
4228  val_i < row_start[local_row_i + 1];
4229  val_i++)
4230  {
4231  abs_sum_of_row += fabs(value[val_i]);
4232  }
4233  }
4234 
4235  // Now minus the diagonal entry...
4236  // Locate the diagonal block matrix.
4237  double* s_values = matrix_pt(block_row_i, block_row_i)->value();
4238  int* s_column_index =
4239  matrix_pt(block_row_i, block_row_i)->column_index();
4240  int* s_row_start = matrix_pt(block_row_i, block_row_i)->row_start();
4241  // int s_nrow_local =
4242  // matrix_pt(block_row_i,block_row_i)->nrow_local();
4243  int s_first_row = matrix_pt(block_row_i, block_row_i)->first_row();
4244 
4245  // Get the diagonal value...
4246  double diagonal_value = 0;
4247  bool found = false;
4248  for (int j = s_row_start[local_row_i];
4249  j < s_row_start[local_row_i + 1] && !found;
4250  j++)
4251  {
4252  if (s_column_index[j] == int(local_row_i + s_first_row))
4253  {
4254  diagonal_value = s_values[j];
4255  found = true;
4256  }
4257  }
4258 
4259  // Check if the diagonal entry is found.
4260  if (!found)
4261  {
4262  std::ostringstream error_message;
4263  error_message << "The diagonal entry for the block(" << block_row_i
4264  << "," << block_row_i << ")\n"
4265  << "on local row " << local_row_i
4266  << " does not exist." << std::endl;
4267  throw OomphLibError(error_message.str(),
4268  OOMPH_CURRENT_FUNCTION,
4269  OOMPH_EXCEPTION_LOCATION);
4270  }
4271 
4272  // This is the disc.
4273  abs_sum_of_row -= fabs(diagonal_value);
4274 
4275  // Now we have to check if the diagonal entry is
4276  // on the left or right side of zero.
4277  if (diagonal_value > 0)
4278  {
4279  double extreme_disc_local = diagonal_value + abs_sum_of_row;
4280  extreme_disc = std::max(extreme_disc_local, extreme_disc);
4281  }
4282  else
4283  {
4284  double extreme_disc_local = diagonal_value - abs_sum_of_row;
4285  extreme_disc = std::min(extreme_disc_local, extreme_disc);
4286  }
4287  } // Loop through local row (of all block column)
4288  } // Loop through block row
4289 
4290  // if this vector is distributed and on multiple processors then gather
4291 #ifdef OOMPH_HAS_MPI
4292  double extreme_disc_local = extreme_disc;
4293  if (matrix_pt(0, 0)->distributed() && comm_pt->nproc() > 1)
4294  {
4295  if (extreme_disc > 0)
4296  {
4297  MPI_Allreduce(&extreme_disc,
4298  &extreme_disc_local,
4299  1,
4300  MPI_DOUBLE,
4301  MPI_MAX,
4302  comm_pt->mpi_comm());
4303  }
4304  else
4305  {
4306  MPI_Allreduce(&extreme_disc,
4307  &extreme_disc_local,
4308  1,
4309  MPI_DOUBLE,
4310  MPI_MIN,
4311  comm_pt->mpi_comm());
4312  }
4313  }
4314  extreme_disc = extreme_disc_local;
4315 #endif
4316 
4317  // and return
4318  return extreme_disc;
4319  }
4320 
4321  //============================================================================
4322  /// Concatenate CRDoubleMatrix matrices.
4323  /// The in matrices are concatenated such that the block structure of the
4324  /// in matrices are preserved in the result matrix. Communication between
4325  /// processors is required. If the block structure of the sub matrices does
4326  /// not need to be preserved, consider using
4327  /// CRDoubleMatrixHelpers::concatenate_without_communication(...).
4328  ///
4329  /// The matrix manipulation functions
4330  /// CRDoubleMatrixHelpers::concatenate(...) and
4331  /// CRDoubleMatrixHelpers::concatenate_without_communication(...)
4332  /// are analogous to the Vector manipulation functions
4333  /// DoubleVectorHelpers::concatenate(...) and
4334  /// DoubleVectorHelpers::concatenate_without_communication(...).
4335  /// Please look at the DoubleVector functions for an illustration of the
4336  /// differences between concatenate(...) and
4337  /// concatenate_without_communication(...).
4338  ///
4339  /// Distribution of the result matrix:
4340  /// If the result matrix does not have a distribution built, then it will be
4341  /// given a uniform row distribution. Otherwise we use the existing
4342  /// distribution. This gives the user the ability to define their own
4343  /// distribution, or save computing power if a distribution has
4344  /// been pre-built.
4345  ///
4346  /// NOTE: ALL the matrices pointed to by matrix_pt has to be built. This is
4347  /// not the case with concatenate_without_communication(...)
4348  //============================================================================
4350  CRDoubleMatrix& result_matrix)
4351  {
4352  // The number of block rows and block columns.
4353  unsigned matrix_nrow = matrix_pt.nrow();
4354  unsigned matrix_ncol = matrix_pt.ncol();
4355 
4356  // PARANOID checks involving only the in matrices.
4357 #ifdef PARANOID
4358  // Are there matrices to concatenate?
4359  if (matrix_nrow == 0)
4360  {
4361  std::ostringstream error_message;
4362  error_message << "There are no matrices to concatenate.\n";
4363  throw OomphLibError(error_message.str(),
4364  OOMPH_CURRENT_FUNCTION,
4365  OOMPH_EXCEPTION_LOCATION);
4366  }
4367 
4368  // Does this matrix need concatenating?
4369  if ((matrix_nrow == 1) && (matrix_ncol == 1))
4370  {
4371  std::ostringstream warning_message;
4372  warning_message << "There is only one matrix to concatenate...\n"
4373  << "This does not require concatenating...\n";
4374  OomphLibWarning(warning_message.str(),
4375  OOMPH_CURRENT_FUNCTION,
4376  OOMPH_EXCEPTION_LOCATION);
4377  }
4378 
4379  // Are all sub matrices built?
4380  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4381  {
4382  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4383  {
4384  if (!(matrix_pt(block_row_i, block_col_i)->built()))
4385  {
4386  std::ostringstream error_message;
4387  error_message << "The sub matrix (" << block_row_i << ","
4388  << block_col_i << ")\n"
4389  << "is not built. \n";
4390  throw OomphLibError(error_message.str(),
4391  OOMPH_CURRENT_FUNCTION,
4392  OOMPH_EXCEPTION_LOCATION);
4393  }
4394  }
4395  }
4396 
4397  // Do all dimensions of sub matrices "make sense"?
4398  // Compare the number of rows of each block matrix in a block row.
4399  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4400  {
4401  // Use the first column to compare against the rest.
4402  unsigned long current_block_nrow = matrix_pt(block_row_i, 0)->nrow();
4403 
4404  // Compare against columns 1 to matrix_ncol - 1
4405  for (unsigned block_col_i = 1; block_col_i < matrix_ncol; block_col_i++)
4406  {
4407  // Get the nrow for this sub block.
4408  unsigned long subblock_nrow =
4409  matrix_pt(block_row_i, block_col_i)->nrow();
4410 
4411  if (current_block_nrow != subblock_nrow)
4412  {
4413  std::ostringstream error_message;
4414  error_message << "The sub matrix (" << block_row_i << ","
4415  << block_col_i << ")\n"
4416  << "requires nrow = " << current_block_nrow
4417  << ", but has nrow = " << subblock_nrow << ".\n";
4418  throw OomphLibError(error_message.str(),
4419  OOMPH_CURRENT_FUNCTION,
4420  OOMPH_EXCEPTION_LOCATION);
4421  }
4422  }
4423  }
4424 
4425  // Compare the number of columns of each block matrix in a block column.
4426  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4427  {
4428  // Use the first row to compare against the rest.
4429  unsigned long current_block_ncol = matrix_pt(0, block_col_i)->ncol();
4430 
4431  // Compare against rows 1 to matrix_nrow - 1
4432  for (unsigned block_row_i = 1; block_row_i < matrix_nrow; block_row_i++)
4433  {
4434  // Get the ncol for this sub block.
4435  unsigned long subblock_ncol =
4436  matrix_pt(block_row_i, block_col_i)->ncol();
4437 
4438  if (current_block_ncol != subblock_ncol)
4439  {
4440  std::ostringstream error_message;
4441  error_message << "The sub matrix (" << block_row_i << ","
4442  << block_col_i << ")\n"
4443  << "requires ncol = " << current_block_ncol
4444  << ", but has ncol = " << subblock_ncol << ".\n";
4445  throw OomphLibError(error_message.str(),
4446  OOMPH_CURRENT_FUNCTION,
4447  OOMPH_EXCEPTION_LOCATION);
4448  }
4449  }
4450  }
4451 #endif
4452 
4453  // The communicator pointer from block (0,0)
4454  const OomphCommunicator* const comm_pt =
4455  matrix_pt(0, 0)->distribution_pt()->communicator_pt();
4456 
4457  // Check if the block (0,0) is distributed or not.
4458  bool distributed = matrix_pt(0, 0)->distributed();
4459 
4460  // If the result matrix does not have a distribution, we create a uniform
4461  // distribution.
4462  if (!result_matrix.distribution_pt()->built())
4463  {
4464  // Sum of sub matrix nrow. We use the first column.
4465  unsigned tmp_nrow = 0;
4466  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4467  {
4468  tmp_nrow += matrix_pt(block_row_i, 0)->nrow();
4469  }
4470 
4471  LinearAlgebraDistribution tmp_distribution(
4472  comm_pt, tmp_nrow, distributed);
4473 
4474  result_matrix.build(&tmp_distribution);
4475  }
4476  else
4477  // A distribution is supplied for the result matrix.
4478  {
4479 #ifdef PARANOID
4480  // Check if the sum of the nrow from the sub matrices is the same as the
4481  // the nrow from the result matrix.
4482 
4483  // Sum of sub matrix nrow. We use the first column.
4484  unsigned tmp_nrow = 0;
4485  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4486  {
4487  tmp_nrow += matrix_pt(block_row_i, 0)->nrow();
4488  }
4489 
4490  if (tmp_nrow != result_matrix.nrow())
4491  {
4492  std::ostringstream error_message;
4493  error_message << "The total number of rows from the matrices to\n"
4494  << "concatenate does not match the nrow from the\n"
4495  << "result matrix\n";
4496  throw OomphLibError(error_message.str(),
4497  OOMPH_CURRENT_FUNCTION,
4498  OOMPH_EXCEPTION_LOCATION);
4499  }
4500 #endif
4501  }
4502 
4503 #ifdef PARANOID
4504 
4505  // Are all the communicators the same?
4506  // Compare the communicator for sub matrices (against the result matrix).
4507  {
4508  const OomphCommunicator communicator =
4509  *(result_matrix.distribution_pt()->communicator_pt());
4510 
4511  // Are all communicator pointers the same?
4512  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4513  {
4514  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
4515  block_col_i++)
4516  {
4517  const OomphCommunicator another_communicator =
4518  *(matrix_pt(block_row_i, block_col_i)
4519  ->distribution_pt()
4520  ->communicator_pt());
4521 
4522  if (!(communicator == another_communicator))
4523  {
4524  std::ostringstream error_message;
4525  error_message << "The OomphCommunicator of the sub matrix ("
4526  << block_row_i << "," << block_col_i << ")\n"
4527  << "does not have the same communicator as the "
4528  "result matrix. \n";
4529  throw OomphLibError(error_message.str(),
4530  OOMPH_CURRENT_FUNCTION,
4531  OOMPH_EXCEPTION_LOCATION);
4532  }
4533  }
4534  }
4535  }
4536 
4537  // Are all the distributed boolean the same? This only applies if we have
4538  // more than one processor. If there is only one processor, then it does
4539  // not matter if it is distributed or not - they are conceptually the
4540  // same.
4541  if (comm_pt->nproc() != 1)
4542  {
4543  // Compare distributed for sub matrices (against the result matrix).
4544  const bool res_distributed = result_matrix.distributed();
4545 
4546  // Loop over all sub blocks.
4547  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4548  {
4549  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
4550  block_col_i++)
4551  {
4552  const bool another_distributed =
4553  matrix_pt(block_row_i, block_col_i)->distributed();
4554 
4555  if (res_distributed != another_distributed)
4556  {
4557  std::ostringstream error_message;
4558  error_message << "The distributed boolean of the sub matrix ("
4559  << block_row_i << "," << block_col_i << ")\n"
4560  << "is not the same as the result matrix. \n";
4561  throw OomphLibError(error_message.str(),
4562  OOMPH_CURRENT_FUNCTION,
4563  OOMPH_EXCEPTION_LOCATION);
4564  }
4565  }
4566  }
4567  }
4568 #endif
4569 
4570 
4571  // Get the number of columns up to each block for offset
4572  // in calculating the result column indices.
4573  // Since the number of columns in each block column is the same,
4574  // we only loop through the first block row (row zero).
4575  Vector<unsigned long> sum_of_ncol_up_to_block(matrix_ncol);
4576 
4577  // Also compute the total number of columns to build the resulting matrix.
4578  unsigned long res_ncol = 0;
4579 
4580  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
4581  {
4582  sum_of_ncol_up_to_block[block_col_i] = res_ncol;
4583  res_ncol += matrix_pt(0, block_col_i)->ncol();
4584  }
4585 
4586  // We begin the process of extracting and ordering the values,
4587  // column_indices and row_start of all the sub blocks.
4588  if ((comm_pt->nproc() == 1) || !distributed)
4589  // Serial version of the code.
4590  {
4591  // Get the total number of non zero entries so we can reserve storage
4592  // for the values and column_indices vectors.
4593  unsigned long res_nnz = 0;
4594  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4595  {
4596  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
4597  block_col_i++)
4598  {
4599  res_nnz += matrix_pt(block_row_i, block_col_i)->nnz();
4600  }
4601  }
4602 
4603  // Declare the vectors required to build a CRDoubleMatrix
4604  Vector<double> res_values;
4605  Vector<int> res_column_indices;
4606  Vector<int> res_row_start;
4607 
4608  // Reserve space for the vectors.
4609  res_values.reserve(res_nnz);
4610  res_column_indices.reserve(res_nnz);
4611  res_row_start.reserve(result_matrix.nrow() + 1);
4612 
4613  // Now we fill in the data.
4614 
4615  // Running sum of nnz per row.
4616  int nnz_running_sum = 0;
4617 
4618  // Loop through the block rows.
4619  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4620  {
4621  // Get the number of rows in this block row, from the first block.
4622  unsigned long block_row_nrow = matrix_pt(block_row_i, 0)->nrow();
4623 
4624  // Loop through the number of rows in this block row
4625  for (unsigned row_i = 0; row_i < block_row_nrow; row_i++)
4626  {
4627  // The row start is the nnz at the start of the row.
4628  res_row_start.push_back(nnz_running_sum);
4629 
4630  // Loop through the block columns
4631  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
4632  block_col_i++)
4633  {
4634  // Get the current block.
4635  CRDoubleMatrix* current_block_pt =
4636  matrix_pt(block_row_i, block_col_i);
4637 
4638  // Get the values, column_indices and row_start for this block.
4639  double* current_block_values = current_block_pt->value();
4640  int* current_block_column_indices =
4641  current_block_pt->column_index();
4642  int* current_block_row_start = current_block_pt->row_start();
4643 
4644  for (int val_i = current_block_row_start[row_i];
4645  val_i < current_block_row_start[row_i + 1];
4646  val_i++)
4647  {
4648  res_values.push_back(current_block_values[val_i]);
4649  res_column_indices.push_back(
4650  current_block_column_indices[val_i] +
4651  sum_of_ncol_up_to_block[block_col_i]);
4652  }
4653 
4654  // Update the running sum of nnz per row
4655  nnz_running_sum += current_block_row_start[row_i + 1] -
4656  current_block_row_start[row_i];
4657  } // for block cols
4658  } // for rows
4659  } // for block rows
4660 
4661  // Fill in the last row start
4662  res_row_start.push_back(res_nnz);
4663 
4664  // Build the matrix
4665  result_matrix.build(
4666  res_ncol, res_values, res_column_indices, res_row_start);
4667  }
4668  // Otherwise we are dealing with a distributed matrix.
4669  else
4670  {
4671 #ifdef OOMPH_HAS_MPI
4672 
4673  // Flag to enable timing. This is for debugging
4674  // and/or testing purposes only.
4675  bool enable_timing = false;
4676 
4677  // Get the number of processors
4678  unsigned nproc = comm_pt->nproc();
4679 
4680  // My rank
4681  unsigned my_rank = comm_pt->my_rank();
4682 
4683  // Storage for the data (per processor) to send.
4684  Vector<Vector<unsigned>> column_indices_to_send(nproc);
4685  Vector<Vector<double>> values_to_send(nproc);
4686 
4687  // The sum of the nrow for the sub blocks (so far). This is used as an
4688  // offset to calculate the global equation number in the result matrix.
4689  unsigned long sum_of_block_nrow = 0;
4690 
4691  double t_prep_data_start;
4692  if (enable_timing)
4693  {
4694  t_prep_data_start = TimingHelpers::timer();
4695  }
4696 
4697  // Get the pointer to the result distribution, for convenience...
4698  LinearAlgebraDistribution* res_distribution_pt =
4699  result_matrix.distribution_pt();
4700 
4701  // loop over the sub blocks to calculate the global_eqn, get the values
4702  // and column indices.
4703  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
4704  {
4705  // Get the number of local rows in this block_row from the first
4706  // block.
4707  unsigned current_block_nrow_local =
4708  matrix_pt(block_row_i, 0)->nrow_local();
4709 
4710  // Get the first_row for this block_row
4711  unsigned current_block_row_first_row =
4712  matrix_pt(block_row_i, 0)->first_row();
4713 
4714  // Loop through the number of local rows
4715  for (unsigned sub_local_eqn = 0;
4716  sub_local_eqn < current_block_nrow_local;
4717  sub_local_eqn++)
4718  {
4719  // Calculate the corresponding (res_global_eqn) equation number
4720  // for this local row number in this block.
4721  unsigned long res_global_eqn =
4722  sub_local_eqn + current_block_row_first_row + sum_of_block_nrow;
4723 
4724  // Get the processor that this global row belongs to.
4725  // The rank_of_global_row(...) function loops through all the
4726  // processors and does two unsigned comparisons. Since we have to do
4727  // this for every row, it may be better to store a list mapping for
4728  // very large number of processors.
4729  unsigned res_p =
4730  res_distribution_pt->rank_of_global_row(res_global_eqn);
4731 
4732  // With the res_p, we get the res_first_row to
4733  // work out the res_local_eqn
4734  unsigned res_first_row = res_distribution_pt->first_row(res_p);
4735  unsigned res_local_eqn = res_global_eqn - res_first_row;
4736 
4737  // Loop through the block columns, calculate the nnz. This is used
4738  // to reserve space for the value and column_indices Vectors.
4739  unsigned long current_row_nnz = 0;
4740  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
4741  block_col_i++)
4742  {
4743  // Get the row_start
4744  int* current_block_row_start =
4745  matrix_pt(block_row_i, block_col_i)->row_start();
4746 
4747  // Update the nnz for this row.
4748  current_row_nnz += current_block_row_start[sub_local_eqn + 1] -
4749  current_block_row_start[sub_local_eqn];
4750  } // for block column, get nnz.
4751 
4752  // Reserve space for efficiency.
4753  // unsigned capacity_in_res_p_vec
4754  // = column_indices_to_send[res_p].capacity();
4755 
4756  // Reserve memory for nnz+2, since we need to store the
4757  // res_local_eqn and nnz as well as the data (values/column
4758  // indices). Note: The two reserve functions are called per row. If
4759  // the matrix is very sparse (just a few elements per row), it will
4760  // be more efficient to not reserve and let the STL vector handle
4761  // this. On average, this implementation is more efficient.
4762  // column_indices_to_send[res_p].reserve(capacity_in_res_p_vec
4763  // + current_row_nnz+2);
4764  // values_to_send[res_p].reserve(capacity_in_res_p_vec
4765  // + current_row_nnz+2);
4766 
4767  // Push back the res_local_eqn and nnz
4768  column_indices_to_send[res_p].push_back(res_local_eqn);
4769  column_indices_to_send[res_p].push_back(current_row_nnz);
4770  values_to_send[res_p].push_back(res_local_eqn);
4771  values_to_send[res_p].push_back(current_row_nnz);
4772 
4773  // Loop through the block columns again and get the values
4774  // and column_indices
4775  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
4776  block_col_i++)
4777  {
4778  // Cache the pointer to the current block for convenience.
4779  CRDoubleMatrix* current_block_pt =
4780  matrix_pt(block_row_i, block_col_i);
4781 
4782  // Values, column indices and row_start for the current block.
4783  double* current_block_values = current_block_pt->value();
4784  int* current_block_column_indices =
4785  current_block_pt->column_index();
4786  int* current_block_row_start = current_block_pt->row_start();
4787 
4788  // Loop though the values and column_indices
4789  for (int val_i = current_block_row_start[sub_local_eqn];
4790  val_i < current_block_row_start[sub_local_eqn + 1];
4791  val_i++)
4792  {
4793  // Push back the value.
4794  values_to_send[res_p].push_back(current_block_values[val_i]);
4795 
4796  // Push back the (offset) column index.
4797  column_indices_to_send[res_p].push_back(
4798  current_block_column_indices[val_i] +
4799  sum_of_ncol_up_to_block[block_col_i]);
4800  } // for block columns
4801  } // for block column, get values and column_indices.
4802  } // for sub_local_eqn
4803 
4804  // update the sum_of_block_nrow
4805  sum_of_block_nrow += matrix_pt(block_row_i, 0)->nrow();
4806 
4807  } // for block_row
4808 
4809  if (enable_timing)
4810  {
4811  double t_prep_data_finish = TimingHelpers::timer();
4812  double t_prep_data_time = t_prep_data_finish - t_prep_data_start;
4813  oomph_info << "Time for prep data: " << t_prep_data_time << std::endl;
4814  }
4815 
4816  // Prepare to send data!
4817 
4818  // Storage for the number of data to be sent to each processor.
4819  Vector<int> send_n(nproc, 0);
4820 
4821  // Storage for all the values/column indices to be sent
4822  // to each processor.
4823  Vector<double> send_values_data;
4824  Vector<unsigned> send_column_indices_data;
4825 
4826  // Storage location within send_values_data
4827  // (and send_column_indices_data) for data to be sent to each processor.
4828  Vector<int> send_displacement(nproc, 0);
4829 
4830  double t_total_ndata_start;
4831  if (enable_timing) t_total_ndata_start = TimingHelpers::timer();
4832 
4833  // Get the total amount of data which needs to be sent, so we can
4834  // reserve space for it.
4835  unsigned total_ndata = 0;
4836  for (unsigned rank = 0; rank < nproc; rank++)
4837  {
4838  if (rank != my_rank)
4839  {
4840  total_ndata += values_to_send[rank].size();
4841  }
4842  }
4843 
4844  if (enable_timing)
4845  {
4846  double t_total_ndata_finish = TimingHelpers::timer();
4847  double t_total_ndata_time =
4848  t_total_ndata_finish - t_total_ndata_start;
4849  oomph_info << "Time for total_ndata: " << t_total_ndata_time
4850  << std::endl;
4851  }
4852 
4853  double t_flat_pack_start;
4854  if (enable_timing) t_flat_pack_start = TimingHelpers::timer();
4855 
4856  // Now we don't have to re-allocate data/memory when push_back is
4857  // called. Nb. Using push_back without reserving memory may cause
4858  // multiple re-allocation behind the scenes, this is expensive.
4859  send_values_data.reserve(total_ndata);
4860  send_column_indices_data.reserve(total_ndata);
4861 
4862  // Loop over all the processors to "flat pack" the data for sending.
4863  for (unsigned rank = 0; rank < nproc; rank++)
4864  {
4865  // Set the offset for the current processor
4866  // This only has to be done once for both values and column indices.
4867  send_displacement[rank] = send_values_data.size();
4868 
4869  // Don't bother to do anything if
4870  // the processor in the loop is the current processor.
4871  if (rank != my_rank)
4872  {
4873  // Put the values into the send data vector.
4874  // n_data is the same for both values and column indices.
4875  unsigned n_data = values_to_send[rank].size();
4876  for (unsigned j = 0; j < n_data; j++)
4877  {
4878  send_values_data.push_back(values_to_send[rank][j]);
4879  send_column_indices_data.push_back(
4880  column_indices_to_send[rank][j]);
4881  } // for
4882  } // if rank != my_rank
4883 
4884  // Find the number of data to be added to the vector.
4885  // send_n is the same for both values and column indices.
4886  send_n[rank] = send_values_data.size() - send_displacement[rank];
4887  } // loop over processors
4888 
4889  if (enable_timing)
4890  {
4891  double t_flat_pack_finish = TimingHelpers::timer();
4892  double t_flat_pack_time = t_flat_pack_finish - t_flat_pack_start;
4893  oomph_info << "t_flat_pack_time: " << t_flat_pack_time << std::endl;
4894  }
4895 
4896  double t_sendn_start;
4897  if (enable_timing) t_sendn_start = TimingHelpers::timer();
4898 
4899  // Strorage for the number of data to be received from each processor
4900  Vector<int> receive_n(nproc, 0);
4901 
4902  MPI_Alltoall(&send_n[0],
4903  1,
4904  MPI_INT,
4905  &receive_n[0],
4906  1,
4907  MPI_INT,
4908  comm_pt->mpi_comm());
4909 
4910  if (enable_timing)
4911  {
4912  double t_sendn_finish = TimingHelpers::timer();
4913  double t_sendn_time = t_sendn_finish - t_sendn_start;
4914  oomph_info << "t_sendn_time: " << t_sendn_time << std::endl;
4915  }
4916 
4917 
4918  // Prepare the data to be received
4919  // by working out the displacement from the received data
4920  // receive_displacement is the same for both values and column indices.
4921  Vector<int> receive_displacement(nproc, 0);
4922  int receive_data_count = 0;
4923  for (unsigned rank = 0; rank < nproc; rank++)
4924  {
4925  receive_displacement[rank] = receive_data_count;
4926  receive_data_count += receive_n[rank];
4927  }
4928 
4929  // Now resize the receive buffer for all data from all processors.
4930  // Make sure that it has a size of at least one.
4931  if (receive_data_count == 0)
4932  {
4933  receive_data_count++;
4934  }
4935  Vector<double> receive_values_data(receive_data_count);
4936  Vector<unsigned> receive_column_indices_data(receive_data_count);
4937 
4938  // Make sure that the send buffer has size at least one
4939  // so that we don't get a segmentation fault.
4940  if (send_values_data.size() == 0)
4941  {
4942  send_values_data.resize(1);
4943  }
4944 
4945  double t_send_data_start;
4946  if (enable_timing) t_send_data_start = TimingHelpers::timer();
4947 
4948  // Now send the data between all processors
4949  MPI_Alltoallv(&send_values_data[0],
4950  &send_n[0],
4951  &send_displacement[0],
4952  MPI_DOUBLE,
4953  &receive_values_data[0],
4954  &receive_n[0],
4955  &receive_displacement[0],
4956  MPI_DOUBLE,
4957  comm_pt->mpi_comm());
4958 
4959  // Now send the data between all processors
4960  MPI_Alltoallv(&send_column_indices_data[0],
4961  &send_n[0],
4962  &send_displacement[0],
4963  MPI_UNSIGNED,
4964  &receive_column_indices_data[0],
4965  &receive_n[0],
4966  &receive_displacement[0],
4967  MPI_UNSIGNED,
4968  comm_pt->mpi_comm());
4969 
4970  if (enable_timing)
4971  {
4972  double t_send_data_finish = TimingHelpers::timer();
4973  double t_send_data_time = t_send_data_finish - t_send_data_start;
4974  oomph_info << "t_send_data_time: " << t_send_data_time << std::endl;
4975  }
4976 
4977  // All the rows for this processor are stored in:
4978  // from other processors:
4979  // receive_column_indices_data and receive_values_data
4980  // from this processor:
4981  // column_indices_to_send[my_rank] and values_to_send[my_rank]
4982  //
4983  // They are in some order determined by the distribution.
4984  // We need to re-arrange them. To do this, we do some pre-processing.
4985 
4986  // nrow_local for this processor.
4987  unsigned res_nrow_local = res_distribution_pt->nrow_local();
4988 
4989  // Per row, store:
4990  // 1) where this row came from, 0 - this proc, 1 - other procs.
4991  // 2) the nnz,
4992  // 3) the offset - where the values/columns in the receive data vectors
4993  // begins. This is different from the offset of where
4994  // the data from a certain processor starts.
4995  Vector<Vector<unsigned>> value_column_locations(res_nrow_local,
4996  Vector<unsigned>(3, 0));
4997 
4998  // Store the local nnz so we can reserve space for
4999  // the values and column indices.
5000  unsigned long res_nnz_local = 0;
5001 
5002  double t_locations_start;
5003  if (enable_timing) t_locations_start = TimingHelpers::timer();
5004 
5005  // Loop through the data currently on this processor.
5006  unsigned location_i = 0;
5007  unsigned my_column_indices_to_send_size =
5008  column_indices_to_send[my_rank].size();
5009  while (location_i < my_column_indices_to_send_size)
5010  {
5011  unsigned current_local_eqn =
5012  column_indices_to_send[my_rank][location_i++];
5013 
5014  unsigned current_nnz = column_indices_to_send[my_rank][location_i++];
5015 
5016  // No need to fill [*][0] with 0 since it is already initialised to 0.
5017 
5018  // Store the nnz.
5019  value_column_locations[current_local_eqn][1] = current_nnz;
5020 
5021  // Also increment the res_local_nnz
5022  res_nnz_local += current_nnz;
5023 
5024  // Store the offset.
5025  value_column_locations[current_local_eqn][2] = location_i;
5026 
5027  // Update the location_i so it starts at the next row.
5028  location_i += current_nnz;
5029  }
5030 
5031  // Loop through the data from different processors.
5032 
5033  // Check to see if data has been received.
5034  bool data_has_been_received = false;
5035  unsigned send_rank = 0;
5036  while (send_rank < nproc)
5037  {
5038  if (receive_n[send_rank] > 0)
5039  {
5040  data_has_been_received = true;
5041  break;
5042  }
5043  send_rank++;
5044  }
5045 
5046  location_i = 0; // start at 0.
5047  if (data_has_been_received)
5048  {
5049  unsigned receive_column_indices_data_size =
5050  receive_column_indices_data.size();
5051  while (location_i < receive_column_indices_data_size)
5052  {
5053  unsigned current_local_eqn =
5054  receive_column_indices_data[location_i++];
5055  unsigned current_nnz = receive_column_indices_data[location_i++];
5056 
5057  // These comes from other processors.
5058  value_column_locations[current_local_eqn][0] = 1;
5059 
5060  // Store the nnz.
5061  value_column_locations[current_local_eqn][1] = current_nnz;
5062 
5063  // Also increment the res_local_nnz
5064  res_nnz_local += current_nnz;
5065 
5066  // Store the offset.
5067  value_column_locations[current_local_eqn][2] = location_i;
5068 
5069  // Update the location_i so it starts at the next row.
5070  location_i += current_nnz;
5071  }
5072  }
5073 
5074  if (enable_timing)
5075  {
5076  double t_locations_finish = TimingHelpers::timer();
5077  double t_locations_time = t_locations_finish - t_locations_start;
5078  oomph_info << "t_locations_time: " << t_locations_time << std::endl;
5079  }
5080 
5081  double t_fillvecs_start;
5082  if (enable_timing) t_fillvecs_start = TimingHelpers::timer();
5083 
5084  // Now loop through the locations and store the values
5085  // the column indices in the correct order.
5086  Vector<int> res_column_indices;
5087  Vector<double> res_values;
5088  Vector<int> res_row_start;
5089 
5090  res_column_indices.reserve(res_nnz_local);
5091  res_values.reserve(res_nnz_local);
5092  res_row_start.reserve(res_nrow_local + 1);
5093 
5094  // Running sum of nnz for the row_start. Must be int because
5095  // res_row_start is templated with int.
5096  int nnz_running_sum = 0;
5097 
5098  // Now insert the rows.
5099  for (unsigned local_row_i = 0; local_row_i < res_nrow_local;
5100  local_row_i++)
5101  {
5102  // Fill the res_row_start with the nnz so far.
5103  res_row_start.push_back(nnz_running_sum);
5104 
5105  bool data_is_from_other_proc =
5106  bool(value_column_locations[local_row_i][0]);
5107 
5108  unsigned row_i_nnz = value_column_locations[local_row_i][1];
5109 
5110  unsigned row_i_offset = value_column_locations[local_row_i][2];
5111 
5112  if (data_is_from_other_proc)
5113  {
5114  // Insert range [offset, offset+nnz) from
5115  // receive_column_indices_data and receive_values_data into
5116  // res_column_indices and res_values respectively.
5117  res_column_indices.insert(
5118  res_column_indices.end(),
5119  receive_column_indices_data.begin() + row_i_offset,
5120  receive_column_indices_data.begin() + row_i_offset + row_i_nnz);
5121 
5122  res_values.insert(res_values.end(),
5123  receive_values_data.begin() + row_i_offset,
5124  receive_values_data.begin() + row_i_offset +
5125  row_i_nnz);
5126  }
5127  else
5128  {
5129  res_column_indices.insert(res_column_indices.end(),
5130  column_indices_to_send[my_rank].begin() +
5131  row_i_offset,
5132  column_indices_to_send[my_rank].begin() +
5133  row_i_offset + row_i_nnz);
5134 
5135  res_values.insert(res_values.end(),
5136  values_to_send[my_rank].begin() + row_i_offset,
5137  values_to_send[my_rank].begin() + row_i_offset +
5138  row_i_nnz);
5139  }
5140 
5141  // Update the running sum of nnz
5142  nnz_running_sum += row_i_nnz;
5143  }
5144 
5145  // Insert the last row_start value
5146  res_row_start.push_back(res_nnz_local);
5147 
5148  if (enable_timing)
5149  {
5150  double t_fillvecs_finish = TimingHelpers::timer();
5151  double t_fillvecs_time = t_fillvecs_finish - t_fillvecs_start;
5152  oomph_info << "t_fillvecs_time: " << t_fillvecs_time << std::endl;
5153  }
5154 
5155  double t_buildres_start;
5156  if (enable_timing) t_buildres_start = TimingHelpers::timer();
5157 
5158  // build the matrix.
5159  result_matrix.build(
5160  res_ncol, res_values, res_column_indices, res_row_start);
5161 
5162  if (enable_timing)
5163  {
5164  double t_buildres_finish = TimingHelpers::timer();
5165  double t_buildres_time = t_buildres_finish - t_buildres_start;
5166  oomph_info << "t_buildres_time: " << t_buildres_time << std::endl;
5167  }
5168  // */
5169 #endif
5170  }
5171  }
5172 
5173  //============================================================================
5174  /// Concatenate CRDoubleMatrix matrices.
5175  ///
5176  /// The Vector row_distribution_pt contains the LinearAlgebraDistribution
5177  /// of each block row.
5178  /// The Vector col_distribution_pt contains the LinearAlgebraDistribution
5179  /// of each block column.
5180  /// The DenseMatrix matrix_pt contains pointers to the CRDoubleMatrices
5181  /// to concatenate.
5182  /// The CRDoubleMatrix result_matrix is the result matrix.
5183  ///
5184  /// The result matrix is a permutation of the sub matrices such that the
5185  /// data stays on the same processor when the result matrix is built, there
5186  /// is no communication between processors. Thus the block structure of the
5187  /// sub matrices are NOT preserved in the result matrix. The rows are
5188  /// block-permuted, defined by the concatenation of the distributions in
5189  /// row_distribution_pt. Similarly, the columns are block-permuted, defined
5190  /// by the concatenation of the distributions in col_distribution_pt. For
5191  /// more details on the block-permutation, see
5192  /// LinearAlgebraDistributionHelpers::concatenate(...).
5193  ///
5194  /// If one wishes to preserve the block structure of the sub matrices in the
5195  /// result matrix, consider using CRDoubleMatrixHelpers::concatenate(...),
5196  /// which uses communication between processors to ensure that the block
5197  /// structure of the sub matrices are preserved.
5198  ///
5199  /// The matrix manipulation functions
5200  /// CRDoubleMatrixHelpers::concatenate(...) and
5201  /// CRDoubleMatrixHelpers::concatenate_without_communication(...)
5202  /// are analogous to the Vector manipulation functions
5203  /// DoubleVectorHelpers::concatenate(...) and
5204  /// DoubleVectorHelpers::concatenate_without_communication(...).
5205  /// Please look at the DoubleVector functions for an illustration of the
5206  /// differences between concatenate(...) and
5207  /// concatenate_without_communication(...).
5208  ///
5209  /// Distribution of the result matrix:
5210  /// If the result matrix does not have a distribution built, then it will be
5211  /// given a distribution built from the concatenation of the distributions
5212  /// from row_distribution_pt, see
5213  /// LinearAlgebraDistributionHelpers::concatenate(...) for more detail.
5214  /// Otherwise we use the existing distribution.
5215  /// If there is an existing distribution then it must be the same as the
5216  /// distribution from the concatenation of row distributions as described
5217  /// above.
5218  /// Why don't we always compute the distribution "on the fly"?
5219  /// Because a non-uniform distribution requires communication.
5220  /// All block preconditioner distributions are concatenations of the
5221  /// distributions of the individual blocks.
5222  //============================================================================
5224  const Vector<LinearAlgebraDistribution*>& row_distribution_pt,
5225  const Vector<LinearAlgebraDistribution*>& col_distribution_pt,
5226  const DenseMatrix<CRDoubleMatrix*>& matrix_pt,
5227  CRDoubleMatrix& result_matrix)
5228  {
5229  // The number of block rows and block columns.
5230  unsigned matrix_nrow = matrix_pt.nrow();
5231  unsigned matrix_ncol = matrix_pt.ncol();
5232 
5233  // PARANOID checks involving in matrices and block_distribution only.
5234  // PARANOID checks involving the result matrix will come later since
5235  // we have to create the result matrix distribution from the in
5236  // distribution if it does not already exist.
5237 #ifdef PARANOID
5238 
5239  // Are there matrices to concatenate?
5240  if (matrix_nrow == 0 || matrix_ncol == 0)
5241  {
5242  std::ostringstream error_message;
5243  error_message << "There are no matrices to concatenate.\n";
5244  throw OomphLibError(error_message.str(),
5245  OOMPH_CURRENT_FUNCTION,
5246  OOMPH_EXCEPTION_LOCATION);
5247  }
5248 
5249  // Does this matrix need concatenating?
5250  if ((matrix_nrow == 1) && (matrix_ncol == 1))
5251  {
5252  std::ostringstream warning_message;
5253  warning_message << "There is only one matrix to concatenate...\n"
5254  << "This does not require concatenating...\n";
5255  OomphLibWarning(warning_message.str(),
5256  OOMPH_CURRENT_FUNCTION,
5257  OOMPH_EXCEPTION_LOCATION);
5258  }
5259 
5260 
5261  // The distribution for each block row is stored in row_distribution_pt.
5262  // So the number of distributions in row_distribution_pt must be the
5263  // same as matrix_nrow.
5264  if (matrix_nrow != row_distribution_pt.size())
5265  {
5266  std::ostringstream error_message;
5267  error_message << "The number of row distributions must be the same as\n"
5268  << "the number of block rows.";
5269  throw OomphLibError(error_message.str(),
5270  OOMPH_CURRENT_FUNCTION,
5271  OOMPH_EXCEPTION_LOCATION);
5272  }
5273 
5274  // The number of distributions for the columns must match the number of
5275  // block columns.
5276  if (matrix_ncol != col_distribution_pt.size())
5277  {
5278  std::ostringstream error_message;
5279  error_message
5280  << "The number of column distributions must be the same as\n"
5281  << "the number of block columns.";
5282  throw OomphLibError(error_message.str(),
5283  OOMPH_CURRENT_FUNCTION,
5284  OOMPH_EXCEPTION_LOCATION);
5285  }
5286 
5287  // Check that all pointers in row_distribution_pt is not null.
5288  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5289  {
5290  if (row_distribution_pt[block_row_i] == 0)
5291  {
5292  std::ostringstream error_message;
5293  error_message << "The row distribution pointer in position "
5294  << block_row_i << " is null.\n";
5295  throw OomphLibError(error_message.str(),
5296  OOMPH_CURRENT_FUNCTION,
5297  OOMPH_EXCEPTION_LOCATION);
5298  }
5299  }
5300 
5301  // Check that all pointers in row_distribution_pt is not null.
5302  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5303  {
5304  if (col_distribution_pt[block_col_i] == 0)
5305  {
5306  std::ostringstream error_message;
5307  error_message << "The column distribution pointer in position "
5308  << block_col_i << " is null.\n";
5309  throw OomphLibError(error_message.str(),
5310  OOMPH_CURRENT_FUNCTION,
5311  OOMPH_EXCEPTION_LOCATION);
5312  }
5313  }
5314 
5315  // Check that all distributions are built.
5316  // First the row distributions
5317  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5318  {
5319  if (!row_distribution_pt[block_row_i]->built())
5320  {
5321  std::ostringstream error_message;
5322  error_message << "The distribution pointer in position "
5323  << block_row_i << " is not built.\n";
5324  throw OomphLibError(error_message.str(),
5325  OOMPH_CURRENT_FUNCTION,
5326  OOMPH_EXCEPTION_LOCATION);
5327  }
5328  }
5329  // Now the column distributions
5330  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5331  {
5332  if (!col_distribution_pt[block_col_i]->built())
5333  {
5334  std::ostringstream error_message;
5335  error_message << "The distribution pointer in position "
5336  << block_col_i << " is not built.\n";
5337  throw OomphLibError(error_message.str(),
5338  OOMPH_CURRENT_FUNCTION,
5339  OOMPH_EXCEPTION_LOCATION);
5340  }
5341  }
5342 
5343  // Check that all communicators in row_distribution_pt are the same.
5344  const OomphCommunicator first_row_comm =
5345  *(row_distribution_pt[0]->communicator_pt());
5346 
5347  for (unsigned block_row_i = 1; block_row_i < matrix_nrow; block_row_i++)
5348  {
5349  const OomphCommunicator current_comm =
5350  *(row_distribution_pt[block_row_i]->communicator_pt());
5351 
5352  if (first_row_comm != current_comm)
5353  {
5354  std::ostringstream error_message;
5355  error_message
5356  << "The communicator from the row distribution in position "
5357  << block_row_i << " is not the same as the first "
5358  << "communicator from row_distribution_pt";
5359  throw OomphLibError(error_message.str(),
5360  OOMPH_CURRENT_FUNCTION,
5361  OOMPH_EXCEPTION_LOCATION);
5362  }
5363  }
5364 
5365  // Check that all communicators in col_distribution_pt are the same as the
5366  // first row communicator from above.
5367  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5368  {
5369  const OomphCommunicator current_comm =
5370  *(col_distribution_pt[block_col_i]->communicator_pt());
5371 
5372  if (first_row_comm != current_comm)
5373  {
5374  std::ostringstream error_message;
5375  error_message
5376  << "The communicator from the col distribution in position "
5377  << block_col_i << " is not the same as the first "
5378  << "communicator from row_distribution_pt";
5379  throw OomphLibError(error_message.str(),
5380  OOMPH_CURRENT_FUNCTION,
5381  OOMPH_EXCEPTION_LOCATION);
5382  }
5383  }
5384 
5385  // Are all sub matrices built? If the matrix_pt is not null, make sure
5386  // that it is built.
5387  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5388  {
5389  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5390  {
5391  if (matrix_pt(block_row_i, block_col_i) != 0 &&
5392  !(matrix_pt(block_row_i, block_col_i)->built()))
5393  {
5394  std::ostringstream error_message;
5395  error_message << "The sub matrix_pt(" << block_row_i << ","
5396  << block_col_i << ")\n"
5397  << "is not built.\n";
5398  throw OomphLibError(error_message.str(),
5399  OOMPH_CURRENT_FUNCTION,
5400  OOMPH_EXCEPTION_LOCATION);
5401  }
5402  }
5403  }
5404 
5405  // For the matrices which are built, do they have the same communicator as
5406  // the first communicator from row_distribution_pt?
5407  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5408  {
5409  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5410  {
5411  if (matrix_pt(block_row_i, block_col_i) != 0)
5412  {
5413  const OomphCommunicator current_comm =
5414  *(matrix_pt(block_row_i, block_col_i)
5415  ->distribution_pt()
5416  ->communicator_pt());
5417  if (first_row_comm != current_comm)
5418  {
5419  std::ostringstream error_message;
5420  error_message
5421  << "The sub matrix_pt(" << block_row_i << "," << block_col_i
5422  << ")\n"
5423  << "does not have the same communicator pointer as those in\n"
5424  << "(row|col)_distribution_pt.\n";
5425  throw OomphLibError(error_message.str(),
5426  OOMPH_CURRENT_FUNCTION,
5427  OOMPH_EXCEPTION_LOCATION);
5428  }
5429  }
5430  }
5431  }
5432 
5433  // Do all dimensions of sub matrices "make sense"?
5434  // Compare the number of rows of each block matrix in a block row.
5435  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5436  {
5437  // Use the first column to compare against the rest.
5438  unsigned long current_block_nrow =
5439  row_distribution_pt[block_row_i]->nrow();
5440 
5441  // Compare against columns 0 to matrix_ncol - 1
5442  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5443  {
5444  // Perform the check if the matrix_pt is not null.
5445  if (matrix_pt(block_row_i, block_col_i) != 0)
5446  {
5447  // Get the nrow for this sub block.
5448  unsigned long subblock_nrow =
5449  matrix_pt(block_row_i, block_col_i)->nrow();
5450 
5451  if (current_block_nrow != subblock_nrow)
5452  {
5453  std::ostringstream error_message;
5454  error_message << "The sub matrix (" << block_row_i << ","
5455  << block_col_i << ")\n"
5456  << "requires nrow = " << current_block_nrow
5457  << ", but has nrow = " << subblock_nrow << ".\n"
5458  << "Either the row_distribution_pt is incorrect or "
5459  << "the sub matrices are incorrect.\n";
5460  throw OomphLibError(error_message.str(),
5461  OOMPH_CURRENT_FUNCTION,
5462  OOMPH_EXCEPTION_LOCATION);
5463  }
5464  }
5465  }
5466  }
5467 
5468  // Compare the number of columns of each block matrix in a block column.
5469  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5470  {
5471  // Get the current block ncol from the linear algebra distribution.
5472  // Note that we assume that the dimensions are symmetrical.
5473  unsigned current_block_ncol = col_distribution_pt[block_col_i]->nrow();
5474 
5475  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5476  {
5477  if (matrix_pt(block_row_i, block_col_i) != 0)
5478  {
5479  // Get the ncol for this sub block.
5480  unsigned subblock_ncol =
5481  matrix_pt(block_row_i, block_col_i)->ncol();
5482 
5483  if (current_block_ncol != subblock_ncol)
5484  {
5485  std::ostringstream error_message;
5486  error_message << "The sub matrix (" << block_row_i << ","
5487  << block_col_i << ")\n"
5488  << "requires ncol = " << current_block_ncol
5489  << ", but has ncol = " << subblock_ncol << ".\n"
5490  << "Either the col_distribution_pt is incorrect or "
5491  << "the sub matrices are incorrect.\n";
5492  throw OomphLibError(error_message.str(),
5493  OOMPH_CURRENT_FUNCTION,
5494  OOMPH_EXCEPTION_LOCATION);
5495  }
5496  }
5497  }
5498  }
5499 
5500  // Ensure that the distributions for all sub matrices in the same block
5501  // row are the same. This is because we permute the row across several
5502  // matrices.
5503 
5504  // Loop through each block row.
5505  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5506  {
5507  // Get the distribution from the first block in this row.
5508  LinearAlgebraDistribution* block_row_distribution_pt =
5509  row_distribution_pt[block_row_i];
5510 
5511  // Loop through the block columns
5512  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5513  {
5514  if (matrix_pt(block_row_i, block_col_i) != 0)
5515  {
5516  // Get the distribution for this block.
5517  LinearAlgebraDistribution* current_block_distribution_pt =
5518  matrix_pt(block_row_i, block_col_i)->distribution_pt();
5519 
5520  // Ensure that the in matrices is a square block matrix.
5521  if ((*block_row_distribution_pt) !=
5522  (*current_block_distribution_pt))
5523  {
5524  std::ostringstream error_message;
5525  error_message
5526  << "Sub block(" << block_row_i << "," << block_col_i << ")"
5527  << "does not have the same distributoin as the first"
5528  << "block in this block row.\n"
5529  << "All distributions on a block row must be the same"
5530  << "for this function to concatenate matrices.\n";
5531  throw OomphLibError(error_message.str(),
5532  OOMPH_CURRENT_FUNCTION,
5533  OOMPH_EXCEPTION_LOCATION);
5534  }
5535  }
5536  }
5537  }
5538 #endif
5539 
5540  // The communicator pointer from the first row_distribution_pt
5541  const OomphCommunicator* const comm_pt =
5542  row_distribution_pt[0]->communicator_pt();
5543 
5544  // Renamed for so it makes more sense.
5545  unsigned nblock_row = matrix_nrow;
5546 
5547  // If the result matrix does not have a distribution, then we concatenate
5548  // the sub matrix distributions.
5549  if (!result_matrix.distribution_pt()->built())
5550  {
5551  // The result distribution
5552  LinearAlgebraDistribution tmp_distribution;
5554  tmp_distribution);
5555 
5556  result_matrix.build(&tmp_distribution);
5557  }
5558  else
5559  // A distribution is supplied for the result matrix.
5560  {
5561 #ifdef PARANOID
5562  // Check that the result distribution is a concatenation of the
5563  // distributions of the sub matrices.
5564 
5565  LinearAlgebraDistribution wanted_distribution;
5566 
5568  wanted_distribution);
5569 
5570  if (*(result_matrix.distribution_pt()) != wanted_distribution)
5571  {
5572  std::ostringstream error_message;
5573  error_message
5574  << "The result distribution is not correct.\n"
5575  << "Please call the function without a result\n"
5576  << "distribution (clear the result matrix) or check the\n"
5577  << "distribution of the result matrix.\n"
5578  << "The result distribution must be the same as the one \n"
5579  << "created by\n"
5580  << "LinearAlgebraDistributionHelpers::concatenate(...)";
5581  throw OomphLibError(error_message.str(),
5582  OOMPH_CURRENT_FUNCTION,
5583  OOMPH_EXCEPTION_LOCATION);
5584  }
5585 #endif
5586  }
5587 
5588  // The rest of the paranoid checks.
5589 #ifdef PARANOID
5590 
5591  // Make sure that the communicator from the result matrix is the same as
5592  // all the others. This test is redundant if this function created the
5593  // result matrix distribution, since then it is guaranteed that the
5594  // communicators are the same.
5595  {
5596  // Communicator from the result matrix.
5597  const OomphCommunicator res_comm =
5598  *(result_matrix.distribution_pt()->communicator_pt());
5599 
5600  // Is the result communicator pointer the same as the others?
5601  // Since we have already tested the others, we only need to compare
5602  // against one of them. Say the first communicator from
5603  // row_distribution_pt.
5604  const OomphCommunicator first_comm =
5605  *(row_distribution_pt[0]->communicator_pt());
5606 
5607  if (res_comm != first_comm)
5608  {
5609  std::ostringstream error_message;
5610  error_message << "The OomphCommunicator of the result matrix is not "
5611  "the same as the "
5612  << "others!";
5613  throw OomphLibError(error_message.str(),
5614  OOMPH_CURRENT_FUNCTION,
5615  OOMPH_EXCEPTION_LOCATION);
5616  }
5617  }
5618 
5619  // Are all the distributed boolean the same? This only applies if we have
5620  // more than one processor. If there is only one processor, then it does
5621  // not matter if it is distributed or not - they are conceptually the
5622  // same.
5623  if (comm_pt->nproc() != 1)
5624  {
5625  // Compare distributed for sub matrices (against the result matrix).
5626  const bool res_distributed = result_matrix.distributed();
5627 
5628  // Loop over all sub blocks.
5629  for (unsigned block_row_i = 0; block_row_i < matrix_nrow; block_row_i++)
5630  {
5631  for (unsigned block_col_i = 0; block_col_i < matrix_ncol;
5632  block_col_i++)
5633  {
5634  if (matrix_pt(block_row_i, block_col_i) != 0)
5635  {
5636  const bool another_distributed =
5637  matrix_pt(block_row_i, block_col_i)->distributed();
5638 
5639  if (res_distributed != another_distributed)
5640  {
5641  std::ostringstream error_message;
5642  error_message << "The distributed boolean of the sub matrix ("
5643  << block_row_i << "," << block_col_i << ")\n"
5644  << "is not the same as the result matrix. \n";
5645  throw OomphLibError(error_message.str(),
5646  OOMPH_CURRENT_FUNCTION,
5647  OOMPH_EXCEPTION_LOCATION);
5648  }
5649  }
5650  }
5651  }
5652 
5653  // Do this test for row_distribution_pt
5654  const bool first_row_distribution_distributed =
5655  row_distribution_pt[0]->distributed();
5656 
5657  for (unsigned block_row_i = 1; block_row_i < matrix_nrow; block_row_i++)
5658  {
5659  const bool another_distributed =
5660  row_distribution_pt[block_row_i]->distributed();
5661 
5662  if (first_row_distribution_distributed != another_distributed)
5663  {
5664  std::ostringstream error_message;
5665  error_message
5666  << "The distributed boolean of row_distribution_pt["
5667  << block_row_i << "]\n"
5668  << "is not the same as the one from row_distribution_pt[0]. \n";
5669  throw OomphLibError(error_message.str(),
5670  OOMPH_CURRENT_FUNCTION,
5671  OOMPH_EXCEPTION_LOCATION);
5672  }
5673  }
5674 
5675  // Repeat for col_distribution_pt
5676  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5677  {
5678  const bool another_distributed =
5679  col_distribution_pt[block_col_i]->distributed();
5680 
5681  if (first_row_distribution_distributed != another_distributed)
5682  {
5683  std::ostringstream error_message;
5684  error_message
5685  << "The distributed boolean of col_distribution_pt["
5686  << block_col_i << "]\n"
5687  << "is not the same as the one from row_distribution_pt[0]. \n";
5688  throw OomphLibError(error_message.str(),
5689  OOMPH_CURRENT_FUNCTION,
5690  OOMPH_EXCEPTION_LOCATION);
5691  }
5692  }
5693  }
5694 #endif
5695 
5696  /// /////////////// END OF PARANOID TESTS
5697  /// ////////////////////////////////////
5698 
5699  // The number of processors.
5700  unsigned nproc = comm_pt->nproc();
5701 
5702  // Cache the result distribution pointer for convenience.
5703  LinearAlgebraDistribution* res_distribution_pt =
5704  result_matrix.distribution_pt();
5705 
5706  // nrow_local for the result matrix
5707  unsigned res_nrow_local = res_distribution_pt->nrow_local();
5708 
5709  // renamed for readability.
5710  unsigned nblock_col = matrix_ncol;
5711 
5712  // construct the block offset
5713  // DenseMatrix<unsigned> col_offset(nproc,nblock_col,0);
5714  std::vector<std::vector<unsigned>> col_offset(
5715  nproc, std::vector<unsigned>(nblock_col));
5716  unsigned off = 0;
5717  for (unsigned proc_i = 0; proc_i < nproc; proc_i++)
5718  {
5719  for (unsigned block_i = 0; block_i < nblock_col; block_i++)
5720  {
5721  col_offset[proc_i][block_i] = off;
5722  off += col_distribution_pt[block_i]->nrow_local(proc_i);
5723  }
5724  }
5725 
5726  // Do some pre-processing for the processor number a global row number is
5727  // on. This is required when permuting the column entries.
5728  // We need to do this for each distribution, so we have a vector of
5729  // vectors. First index corresponds to the distribution, the second is
5730  // the processor number.
5731  std::vector<std::vector<unsigned>> p_for_rows(nblock_col,
5732  std::vector<unsigned>());
5733  // initialise 2D vector
5734  for (unsigned blocki = 0; blocki < nblock_col; blocki++)
5735  {
5736  int blockinrow = col_distribution_pt[blocki]->nrow();
5737  p_for_rows[blocki].resize(blockinrow);
5738  // FOR each global index in the block, work out the corresponding proc.
5739  for (int rowi = 0; rowi < blockinrow; rowi++)
5740  {
5741  unsigned p = 0;
5742  int b_first_row = col_distribution_pt[blocki]->first_row(p);
5743  int b_nrow_local = col_distribution_pt[blocki]->nrow_local(p);
5744 
5745  while (rowi < b_first_row || rowi >= b_nrow_local + b_first_row)
5746  {
5747  p++;
5748  b_first_row = col_distribution_pt[blocki]->first_row(p);
5749  b_nrow_local = col_distribution_pt[blocki]->nrow_local(p);
5750  }
5751  p_for_rows[blocki][rowi] = p;
5752  }
5753  }
5754 
5755  // determine nnz of all blocks on this processor only.
5756  // This is used to create storage space.
5757  unsigned long res_nnz = 0;
5758  for (unsigned row_i = 0; row_i < nblock_row; row_i++)
5759  {
5760  for (unsigned col_i = 0; col_i < nblock_col; col_i++)
5761  {
5762  if (matrix_pt(row_i, col_i) != 0)
5763  {
5764  res_nnz += matrix_pt(row_i, col_i)->nnz();
5765  }
5766  }
5767  }
5768 
5769  // My rank
5770  // unsigned my_rank = comm_pt->my_rank();
5771  // my_rank = my_rank;
5772 
5773  // Turn the above into a string.
5774  // std::ostringstream myrankstream;
5775  // myrankstream << "THISDOESNOTHINGnp" << my_rank << std::endl;
5776  // std::string myrankstring = myrankstream.str();
5777 
5778 
5779  // CALLGRIND_ZERO_STATS;
5780  // CALLGRIND_START_INSTRUMENTATION;
5781 
5782  // storage for the result matrix.
5783  int* res_row_start = new int[res_nrow_local + 1];
5784  int* res_column_index = new int[res_nnz];
5785  double* res_value = new double[res_nnz];
5786 
5787  // initialise the zero-th entry
5788  res_row_start[0] = 0;
5789 
5790  // loop over the block rows
5791  unsigned long res_i = 0; // index for the result matrix.
5792  unsigned long res_row_i = 0; // index for the row
5793  for (unsigned i = 0; i < nblock_row; i++)
5794  {
5795  // loop over the rows of the current block local rows.
5796  unsigned block_nrow = row_distribution_pt[i]->nrow_local();
5797  for (unsigned k = 0; k < block_nrow; k++)
5798  {
5799  // initialise res_row_start
5800  res_row_start[res_row_i + 1] = res_row_start[res_row_i];
5801 
5802  // Loop over the block columns
5803  for (unsigned j = 0; j < nblock_col; j++)
5804  {
5805  // if block(i,j) pointer is not null then
5806  if (matrix_pt(i, j) != 0)
5807  {
5808  // get pointers for the elements in the current block
5809  int* b_row_start = matrix_pt(i, j)->row_start();
5810  int* b_column_index = matrix_pt(i, j)->column_index();
5811  double* b_value = matrix_pt(i, j)->value();
5812 
5813  // memcpy( &dst[dstIdx], &src[srcIdx], numElementsToCopy * sizeof(
5814  // Element ) );
5815  // no ele to copy
5816  int numEleToCopy = b_row_start[k + 1] - b_row_start[k];
5817  memcpy(res_value + res_i,
5818  b_value + b_row_start[k],
5819  numEleToCopy * sizeof(double));
5820  // Loop through the current local row.
5821  for (int l = b_row_start[k]; l < b_row_start[k + 1]; l++)
5822  {
5823  // if b_column_index[l] was a row index, what processor
5824  // would it be on
5825  // unsigned p = col_distribution_pt[j]
5826  // ->rank_of_global_row_map(b_column_index[l]);
5827  unsigned p = p_for_rows[j][b_column_index[l]];
5828 
5829  int b_first_row = col_distribution_pt[j]->first_row(p);
5830  // int b_nrow_local =
5831  // col_distribution_pt[j]->nrow_local(p);
5832 
5833  // while (b_column_index[l] < b_first_row ||
5834  // b_column_index[l] >=
5835  // b_nrow_local+b_first_row)
5836  // {
5837  // p++;
5838  // b_first_row =
5839  // col_distribution_pt[j]->first_row(p);
5840  // b_nrow_local =
5841  // col_distribution_pt[j]->nrow_local(p);
5842  // }
5843 
5844  // determine the local equation number in the block j/processor
5845  // p "column block"
5846  int eqn = b_column_index[l] - b_first_row;
5847 
5848  // add to the result matrix
5849  // res_value[res_i] = b_value[l];
5850  res_column_index[res_i] = col_offset[p][j] + eqn;
5851  res_row_start[res_row_i + 1]++;
5852  res_i++;
5853  }
5854  }
5855  }
5856 
5857  // increment the row pt
5858  res_row_i++;
5859  }
5860  }
5861  // CALLGRIND_STOP_INSTRUMENTATION;
5862  // CALLGRIND_DUMP_STATS_AT(myrankstring.c_str());
5863 
5864 
5865  // Get the number of columns of the result matrix.
5866  unsigned res_ncol = 0;
5867  for (unsigned block_col_i = 0; block_col_i < matrix_ncol; block_col_i++)
5868  {
5869  res_ncol += col_distribution_pt[block_col_i]->nrow();
5870  }
5871 
5872  // Build the result matrix.
5873  result_matrix.build_without_copy(
5874  res_ncol, res_nnz, res_value, res_column_index, res_row_start);
5875  }
5876 
5877 
5878  //============================================================================
5879  /// Concatenate CRDoubleMatrix matrices.
5880  /// This calls the other concatenate_without_communication(...) function,
5881  /// passing block_distribution_pt as both the row_distribution_pt and
5882  /// col_distribution_pt. This should only be called for block square
5883  /// matrices.
5884  //============================================================================
5886  const Vector<LinearAlgebraDistribution*>& block_distribution_pt,
5887  const DenseMatrix<CRDoubleMatrix*>& matrix_pt,
5888  CRDoubleMatrix& result_matrix)
5889  {
5890 #ifdef PARANOID
5891  // The number of block rows and block columns.
5892  unsigned matrix_nrow = matrix_pt.nrow();
5893  unsigned matrix_ncol = matrix_pt.ncol();
5894 
5895  // Are there matrices to concatenate?
5896  if (matrix_nrow == 0)
5897  {
5898  std::ostringstream error_message;
5899  error_message << "There are no matrices to concatenate.\n";
5900  throw OomphLibError(error_message.str(),
5901  OOMPH_CURRENT_FUNCTION,
5902  OOMPH_EXCEPTION_LOCATION);
5903  }
5904 
5905  // Ensure that the sub matrices is a square block matrix.
5906  if (matrix_nrow != matrix_ncol)
5907  {
5908  std::ostringstream error_message;
5909  error_message
5910  << "The number of block rows and block columns\n"
5911  << "must be the same. Otherwise, call the other\n"
5912  << "concatenate_without_communication function, passing in\n"
5913  << "a Vector of distributions describing how to permute the\n"
5914  << "columns.";
5915  throw OomphLibError(error_message.str(),
5916  OOMPH_CURRENT_FUNCTION,
5917  OOMPH_EXCEPTION_LOCATION);
5918  }
5919 #endif
5920 
5922  block_distribution_pt, block_distribution_pt, matrix_pt, result_matrix);
5923  }
5924 
5925  } // namespace CRDoubleMatrixHelpers
5926 
5927 } // namespace oomph
cstr elem_len * i
Definition: cfortran.h:603
//////////////////////////////////////////////////////////////// ////////////////////////////////////...
Definition: matrices.h:2791
void multiply_transpose(const DoubleVector &x, DoubleVector &soln) const
Multiply the transposed matrix by the vector x: soln=A^T x.
Definition: matrices.cc:715
virtual void lubksub(DoubleVector &rhs)
LU back solve for given RHS.
Definition: matrices.cc:614
void matrix_reduction(const double &alpha, CCDoubleMatrix &reduced_matrix)
For every row, find the maximum absolute value of the entries in this row. Set all values that are le...
Definition: matrices.cc:1149
virtual ~CCDoubleMatrix()
Destructor: Kill the LU factors if they have been setup.
Definition: matrices.cc:597
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:2823
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:2817
CCDoubleMatrix()
Default constructor.
Definition: matrices.cc:572
virtual void ludecompose()
LU decomposition using SuperLU.
Definition: matrices.cc:606
void multiply(const DoubleVector &x, DoubleVector &soln) const
Multiply the matrix by the vector x: soln=Ax.
Definition: matrices.cc:622
unsigned Matrix_matrix_multiply_method
Flag to determine which matrix-matrix multiplication method is used.
Definition: matrices.h:2893
/////////////////////////////////////////////////////////////// /////////////////////////////////////...
Definition: matrices.h:2585
void build_without_copy(T *value, int *row_index, int *column_start, const unsigned long &nnz, const unsigned long &n, const unsigned long &m)
Function to build matrix from pointers to arrays which hold the column starts, row indices and non-ze...
Definition: matrices.h:3199
int * Column_start
Start index for column.
Definition: matrices.h:2779
int * Row_index
Row index.
Definition: matrices.h:2776
int * column_start()
Access to C-style column_start array.
Definition: matrices.h:2692
void build(const Vector< double > &value, const Vector< int > &row_index, const Vector< int > &column_start, const unsigned long &n, const unsigned long &m)
Build matrix from compressed representation. Number of nonzero entries is read off from value,...
Definition: matrices.h:3247
int * row_index()
Access to C-style row index array.
Definition: matrices.h:2704
A class for compressed row matrices. This is a distributable object.
Definition: matrices.h:888
void sort_entries()
Sorts the entries associated with each row of the matrix in the column index vector and the value vec...
Definition: matrices.cc:1449
int * column_index()
Access to C-style column index array.
Definition: matrices.h:1072
int * row_start()
Access to C-style row_start array.
Definition: matrices.h:1060
virtual ~CRDoubleMatrix()
Destructor.
Definition: matrices.cc:1343
struct oomph::CRDoubleMatrix::CRDoubleMatrixComparisonHelper Comparison_struct
void matrix_reduction(const double &alpha, CRDoubleMatrix &reduced_matrix)
For every row, find the maximum absolute value of the entries in this row. Set all values that are le...
Definition: matrices.cc:2365
void multiply_transpose(const DoubleVector &x, DoubleVector &soln) const
Multiply the transposed matrix by the vector x: soln=A^T x.
Definition: matrices.cc:1882
virtual void ludecompose()
LU decomposition using SuperLU if matrix is not distributed or distributed onto a single processor.
Definition: matrices.cc:1728
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:1008
void multiply(const DoubleVector &x, DoubleVector &soln) const
Multiply the matrix by the vector x: soln=Ax.
Definition: matrices.cc:1782
void add(const CRDoubleMatrix &matrix_in, CRDoubleMatrix &result_matrix) const
element-wise addition of this matrix with matrix_in.
Definition: matrices.cc:3515
bool Built
Flag to indicate whether the matrix has been built - i.e. the distribution has been setup AND the mat...
Definition: matrices.h:1253
Vector< int > Index_of_diagonal_entries
Vector whose i'th entry contains the index of the last entry below or on the diagonal of the i'th row...
Definition: matrices.h:1238
double inf_norm() const
returns the inf-norm of this matrix
Definition: matrices.cc:3412
void get_matrix_transpose(CRDoubleMatrix *result) const
Returns the transpose of this matrix.
Definition: matrices.cc:3271
unsigned long nnz() const
Return the number of nonzero entries (the local nnz)
Definition: matrices.h:1096
CRDoubleMatrix * global_matrix() const
if this matrix is distributed then a the equivalent global matrix is built using new and returned....
Definition: matrices.cc:2431
void redistribute(const LinearAlgebraDistribution *const &dist_pt)
The contents of the matrix are redistributed to match the new distribution. In a non-MPI build this m...
Definition: matrices.cc:2575
bool built() const
access function to the Built flag - indicates whether the matrix has been build - i....
Definition: matrices.h:1210
void build_without_copy(const unsigned &ncol, const unsigned &nnz, double *value, int *column_index, int *row_start)
keeps the existing distribution and just matrix that is stored without copying the matrix data
Definition: matrices.cc:1710
double * value()
Access to C-style value array.
Definition: matrices.h:1084
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:1002
unsigned Serial_matrix_matrix_multiply_method
Flag to determine which matrix-matrix multiplication method is used (for serial (or global) matrices)
Definition: matrices.h:1242
Vector< double > diagonal_entries() const
returns a Vector of diagonal entries of this matrix. This only works with square matrices....
Definition: matrices.cc:3465
CRDoubleMatrix()
Default constructor.
Definition: matrices.cc:1214
bool entries_are_sorted(const bool &doc_unordered_entries=false) const
Runs through the column index vector and checks if the entries follow the regular lexicographical ord...
Definition: matrices.cc:1366
CRMatrix< double > CR_matrix
Storage for the Matrix in CR Format.
Definition: matrices.h:1249
virtual void lubksub(DoubleVector &rhs)
LU back solve for given RHS.
Definition: matrices.cc:1749
void build(const LinearAlgebraDistribution *distribution_pt, const unsigned &ncol, const Vector< double > &value, const Vector< int > &column_index, const Vector< int > &row_start)
build method: vector of values, vector of column indices, vector of row starts and number of rows and...
Definition: matrices.cc:1672
void clean_up_memory()
Wipe matrix data and set all values to 0.
Definition: matrices.h:3323
int * row_start()
Access to C-style row_start array.
Definition: matrices.h:785
void build(const Vector< T > &value, const Vector< int > &column_index, const Vector< int > &row_start, const unsigned long &n, const unsigned long &m)
Build matrix from compressed representation. Number of nonzero entries is read off from value,...
Definition: matrices.h:3404
int * column_index()
Access to C-style column index array.
Definition: matrices.h:797
void build_without_copy(T *value, int *column_index, int *row_start, const unsigned long &nnz, const unsigned long &n, const unsigned long &m)
Function to build matrix from pointers to arrays which hold the row starts, column indices and non-ze...
Definition: matrices.h:3354
T get_entry(const unsigned long &i, const unsigned long &j) const
Access function that will be called by the read-only round-bracket operator (const)
Definition: matrices.h:748
Class of matrices containing doubles, and stored as a DenseMatrix<double>, but with solving functiona...
Definition: matrices.h:1271
DenseDoubleMatrix()
Constructor, set the default linear solver.
Definition: matrices.cc:139
virtual void lubksub(DoubleVector &rhs)
LU backsubstitution.
Definition: matrices.cc:202
virtual ~DenseDoubleMatrix()
Destructor.
Definition: matrices.cc:182
void matrix_reduction(const double &alpha, DenseDoubleMatrix &reduced_matrix)
For every row, find the maximum absolute value of the entries in this row. Set all values that are le...
Definition: matrices.cc:481
virtual void ludecompose()
LU decomposition using DenseLU (default linea solver)
Definition: matrices.cc:192
void multiply_transpose(const DoubleVector &x, DoubleVector &soln) const
Multiply the transposed matrix by the vector x: soln=A^T x.
Definition: matrices.cc:385
void multiply(const DoubleVector &x, DoubleVector &soln) const
Multiply the matrix by the vector x: soln=Ax.
Definition: matrices.cc:294
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:1295
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:1301
void eigenvalues_by_jacobi(Vector< double > &eigen_val, DenseMatrix< double > &eigen_vect) const
Determine eigenvalues and eigenvectors, using Jacobi rotations. Only for symmetric matrices....
Definition: matrices.cc:224
Dense LU decomposition-based solve of full assembled linear system. VERY inefficient but useful to il...
//////////////////////////////////////////////////////////////////////////// ////////////////////////...
Definition: matrices.h:386
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:485
unsigned long N
Number of rows.
Definition: matrices.h:392
double * Matrixdata
Internal representation of matrix as a pointer to data.
Definition: matrices.h:389
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:491
unsigned long M
Number of columns.
Definition: matrices.h:395
void resize(const unsigned long &n)
Resize to a square nxn matrix; any values already present will be transfered.
Definition: matrices.h:498
void clear_distribution()
clear the distribution of this distributable linear algebra object
bool distributed() const
distribution is serial or distributed
LinearAlgebraDistribution * distribution_pt() const
access to the LinearAlgebraDistribution
unsigned nrow() const
access function to the number of global rows.
bool distribution_built() const
if the communicator_pt is null then the distribution is not setup then false is returned,...
unsigned nrow_local() const
access function for the num of local rows on this processor.
unsigned first_row() const
access function for the first row on this processor
void build_distribution(const LinearAlgebraDistribution *const dist_pt)
setup the distribution of this distributable linear algebra object
void solve(DoubleVector &rhs)
Complete LU solve (replaces matrix by its LU decomposition and overwrites RHS with solution)....
Definition: matrices.cc:50
LinearSolver * Default_linear_solver_pt
Definition: matrices.h:267
LinearSolver * Linear_solver_pt
Definition: matrices.h:264
A vector in the mathematical sense, initially developed for linear algebra type applications....
Definition: double_vector.h:58
void initialise(const double &v)
initialise the whole vector with value v
void build(const DoubleVector &old_vector)
Just copys the argument DoubleVector.
double * values_pt()
access function to the underlying values
bool built() const
Describes the distribution of a distributable linear algebra type object. Typically this is a contain...
bool distributed() const
access function to the distributed - indicates whether the distribution is serial or distributed
unsigned first_row() const
access function for the first row on this processor. If not distributed then this is just zero.
OomphCommunicator * communicator_pt() const
const access to the communicator pointer
void build(const OomphCommunicator *const comm_pt, const unsigned &first_row, const unsigned &nrow_local, const unsigned &nrow=0)
Sets the distribution. Takes first_row, nrow_local and nrow as arguments. If nrow is not provided or ...
bool built() const
if the communicator_pt is null then the distribution is not setup then false is returned,...
unsigned nrow() const
access function to the number of global rows.
unsigned nrow_local() const
access function for the num of local rows on this processor. If no MPI then Nrow is returned.
unsigned rank_of_global_row(const unsigned i) const
return the processor rank of the global row number i
virtual void solve(Problem *const &problem_pt, DoubleVector &result)=0
Solver: Takes pointer to problem and returns the results vector which contains the solution of the li...
virtual void clean_up_memory()
Empty virtual function that can be overloaded in specific linear solvers to clean up any memory that ...
An oomph-lib wrapper to the MPI_Comm communicator object. Just contains an MPI_Comm object (which is ...
Definition: communicator.h:54
An OomphLibError object which should be thrown when an run-time error is encountered....
An OomphLibWarning object which should be created as a temporary object to issue a warning....
unsigned long Nnz
Number of non-zero values (i.e. size of Value array)
Definition: matrices.h:574
T * value()
Access to C-style value array.
Definition: matrices.h:616
T * Value
Internal representation of the matrix values, a pointer.
Definition: matrices.h:565
unsigned long N
Number of rows.
Definition: matrices.h:568
unsigned long M
Number of columns.
Definition: matrices.h:571
//////////////////////////////////////////////////////////////////////////// ////////////////////////...
double gershgorin_eigenvalue_estimate(const DenseMatrix< CRDoubleMatrix * > &matrix_pt)
Calculates the largest Gershgorin disc whilst preserving the sign. Let A be an n by n matrix,...
Definition: matrices.cc:4003
void concatenate_without_communication(const Vector< LinearAlgebraDistribution * > &row_distribution_pt, const Vector< LinearAlgebraDistribution * > &col_distribution_pt, const DenseMatrix< CRDoubleMatrix * > &matrix_pt, CRDoubleMatrix &result_matrix)
Concatenate CRDoubleMatrix matrices.
Definition: matrices.cc:5223
double inf_norm(const DenseMatrix< CRDoubleMatrix * > &matrix_pt)
Compute infinity (maximum) norm of sub blocks as if it was one matrix.
Definition: matrices.cc:3731
void concatenate(const DenseMatrix< CRDoubleMatrix * > &matrix_pt, CRDoubleMatrix &result_matrix)
Concatenate CRDoubleMatrix matrices. The in matrices are concatenated such that the block structure o...
Definition: matrices.cc:4349
void create_uniformly_distributed_matrix(const unsigned &nrow, const unsigned &ncol, const OomphCommunicator *const comm_pt, const Vector< double > &values, const Vector< int > &column_indices, const Vector< int > &row_start, CRDoubleMatrix &matrix_out)
Builds a uniformly distributed matrix. A locally replicated matrix is constructed then redistributed ...
Definition: matrices.cc:3676
void concatenate(const Vector< LinearAlgebraDistribution * > &in_distribution_pt, LinearAlgebraDistribution &out_distribution)
Takes a vector of LinearAlgebraDistribution objects and concatenates them such that the nrow_local of...
double timer()
returns the time in seconds after some point in past
void multiply(const CRDoubleMatrix *matrix, const DoubleVector &x, DoubleVector &soln)
Function to perform a matrix-vector multiplication on a oomph-lib matrix and vector using Trilinos fu...
//////////////////////////////////////////////////////////////////// ////////////////////////////////...
OomphInfo oomph_info
Single (global) instantiation of the OomphInfo object – this is used throughout the library as a "rep...