bes  Updated for version 3.20.5
DmrppArray.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2016 OPeNDAP, Inc.
6 // Author: James Gallagher <jgallagher@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <string>
27 #include <sstream>
28 #include <iomanip>
29 #include <vector>
30 #include <queue>
31 #include <iterator>
32 
33 #include <cstring>
34 #include <cassert>
35 #include <cerrno>
36 
37 #include <pthread.h>
38 
39 #include <unistd.h>
40 
41 #include <D4Enum.h>
42 #include <D4EnumDefs.h>
43 #include <D4Attributes.h>
44 #include <D4Maps.h>
45 #include <D4Group.h>
46 
47 #include "BESLog.h"
48 #include "BESInternalError.h"
49 #include "BESDebug.h"
50 
51 #include "CurlHandlePool.h"
52 #include "Chunk.h"
53 #include "DmrppArray.h"
54 #include "DmrppRequestHandler.h"
55 
56 // Used with BESDEBUG
57 static const string dmrpp_3 = "dmrpp:3";
58 static const string dmrpp_4 = "dmrpp:4";
59 
60 using namespace libdap;
61 using namespace std;
62 
63 namespace dmrpp {
64 
65 void DmrppArray::_duplicate(const DmrppArray &)
66 {
67 }
68 
69 DmrppArray::DmrppArray(const string &n, BaseType *v) :
70  Array(n, v, true /*is dap4*/), DmrppCommon()
71 {
72 }
73 
74 DmrppArray::DmrppArray(const string &n, const string &d, BaseType *v) :
75  Array(n, d, v, true), DmrppCommon()
76 {
77 }
78 
79 BaseType *
80 DmrppArray::ptr_duplicate()
81 {
82  return new DmrppArray(*this);
83 }
84 
85 DmrppArray::DmrppArray(const DmrppArray &rhs) :
86  Array(rhs), DmrppCommon(rhs)
87 {
88  _duplicate(rhs);
89 }
90 
91 DmrppArray &
92 DmrppArray::operator=(const DmrppArray &rhs)
93 {
94  if (this == &rhs) return *this;
95 
96  dynamic_cast<Array &>(*this) = rhs; // run Constructor=
97 
98  _duplicate(rhs);
99  DmrppCommon::m_duplicate_common(rhs);
100 
101  return *this;
102 }
103 
108 bool DmrppArray::is_projected()
109 {
110  for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
111  if (dimension_size(p, true) != dimension_size(p, false)) return true;
112 
113  return false;
114 }
115 
134 static unsigned long long get_index(const vector<unsigned int> &address_in_target, const vector<unsigned int> &target_shape)
135 {
136  assert(address_in_target.size() == target_shape.size()); // ranks must be equal
137 
138  vector<unsigned int>::const_reverse_iterator shape_index = target_shape.rbegin();
139  vector<unsigned int>::const_reverse_iterator index = address_in_target.rbegin(), index_end = address_in_target.rend();
140 
141  unsigned long long multiplier = *shape_index++;
142  unsigned long long offset = *index++;
143 
144  while (index != index_end) {
145  assert(*index < *shape_index); // index < shape for each dim
146 
147  offset += multiplier * *index++;
148  multiplier *= *shape_index++;
149  }
150 
151  return offset;
152 }
153 
160 unsigned long long DmrppArray::get_size(bool constrained)
161 {
162  // number of array elements in the constrained array
163  unsigned long long size = 1;
164  for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
165  size *= dimension_size(dim, constrained);
166  }
167  return size;
168 }
169 
176 vector<unsigned int> DmrppArray::get_shape(bool constrained)
177 {
178  Dim_iter dim = dim_begin(), edim = dim_end();
179  vector<unsigned int> shape;
180 
181  // For a 3d array, this method took 14ms without reserve(), 5ms with
182  // (when called many times).
183  shape.reserve(edim - dim);
184 
185  for (; dim != edim; dim++) {
186  shape.push_back(dimension_size(dim, constrained));
187  }
188 
189  return shape;
190 }
191 
197 DmrppArray::dimension DmrppArray::get_dimension(unsigned int i)
198 {
199  assert(i <= (dim_end() - dim_begin()));
200  return *(dim_begin() + i);
201 }
202 
208 void DmrppArray::insert_constrained_contiguous(Dim_iter dimIter, unsigned long *target_index, vector<unsigned int> &subsetAddress,
209  const vector<unsigned int> &array_shape, char /*Chunk*/*src_buf)
210 {
211  BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() - subsetAddress.size(): " << subsetAddress.size() << endl);
212 
213  unsigned int bytesPerElt = prototype()->width();
214 
215  char *dest_buf = get_buf();
216 
217  unsigned int start = this->dimension_start(dimIter, true);
218  unsigned int stop = this->dimension_stop(dimIter, true);
219  unsigned int stride = this->dimension_stride(dimIter, true);
220 
221  dimIter++;
222 
223  // The end case for the recursion is dimIter == dim_end(); stride == 1 is an optimization
224  // See the else clause for the general case.
225  if (dimIter == dim_end() && stride == 1) {
226  // For the start and stop indexes of the subset, get the matching indexes in the whole array.
227  subsetAddress.push_back(start);
228  unsigned long start_index = get_index(subsetAddress, array_shape);
229  subsetAddress.pop_back();
230 
231  subsetAddress.push_back(stop);
232  unsigned long stop_index = get_index(subsetAddress, array_shape);
233  subsetAddress.pop_back();
234 
235  // Copy data block from start_index to stop_index
236  // TODO Replace this loop with a call to std::memcpy()
237  for (unsigned long sourceIndex = start_index; sourceIndex <= stop_index; sourceIndex++) {
238  unsigned long target_byte = *target_index * bytesPerElt;
239  unsigned long source_byte = sourceIndex * bytesPerElt;
240  // Copy a single value.
241  for (unsigned long i = 0; i < bytesPerElt; i++) {
242  dest_buf[target_byte++] = src_buf[source_byte++];
243  }
244  (*target_index)++;
245  }
246  }
247  else {
248  for (unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
249 
250  // Is it the last dimension?
251  if (dimIter != dim_end()) {
252  // Nope!
253  // then we recurse to the last dimension to read stuff
254  subsetAddress.push_back(myDimIndex);
255  insert_constrained_contiguous(dimIter, target_index, subsetAddress, array_shape, src_buf);
256  subsetAddress.pop_back();
257  }
258  else {
259  // We are at the last (inner most) dimension.
260  // So it's time to copy values.
261  subsetAddress.push_back(myDimIndex);
262  unsigned int sourceIndex = get_index(subsetAddress, array_shape);
263  subsetAddress.pop_back();
264 
265  // Copy a single value.
266  unsigned long target_byte = *target_index * bytesPerElt;
267  unsigned long source_byte = sourceIndex * bytesPerElt;
268 
269  for (unsigned int i = 0; i < bytesPerElt; i++) {
270  dest_buf[target_byte++] = src_buf[source_byte++];
271  }
272  (*target_index)++;
273  }
274  }
275  }
276 }
277 
283 void DmrppArray::read_contiguous()
284 {
285  // These first four lines reproduce DmrppCommon::read_atomic(). The call
286  // to Chunk::inflate_chunk() handles 'contiguous' data that are compressed.
287  // And since we need the chunk, I copied the read_atomix code here.
288 
289  vector<Chunk> &chunk_refs = get_chunk_vec();
290 
291  if (chunk_refs.size() != 1) throw BESInternalError(string("Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
292 
293  Chunk &chunk = chunk_refs[0];
294 
295  // TODO Break this call down so that data can be read in parallel. jhrg 8/21/18
296  chunk.read_chunk();
297 
298  chunk.inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
299 
300  // 'chunk' now holds the data. Transfer it to the Array.
301 
302  if (!is_projected()) { // if there is no projection constraint
303  val2buf(chunk.get_rbuf()); // yes, it's not type-safe
304  }
305  else { // apply the constraint
306  vector<unsigned int> array_shape = get_shape(false);
307 
308  // Reserve space in this array for the constrained size of the data request
309  reserve_value_capacity(get_size(true));
310  unsigned long target_index = 0;
311  vector<unsigned int> subset;
312 
313  insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, chunk.get_rbuf());
314  }
315 
316  set_read_p(true);
317 }
318 
331 unsigned long long DmrppArray::get_chunk_start(const dimension &thisDim, unsigned int chunk_origin)
332 {
333  // What's the first element that we are going to access for this dimension of the chunk?
334  unsigned long long first_element_offset = 0; // start with 0
335  if ((unsigned) (thisDim.start) < chunk_origin) {
336  // If the start is behind this chunk, then it's special.
337  if (thisDim.stride != 1) {
338  // And if the stride isn't 1, we have to figure our where to begin in this chunk.
339  first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
340  // If it's zero great!
341  if (first_element_offset != 0) {
342  // otherwise we adjustment to get correct first element.
343  first_element_offset = thisDim.stride - first_element_offset;
344  }
345  }
346  }
347  else {
348  first_element_offset = thisDim.start - chunk_origin;
349  }
350 
351  return first_element_offset;
352 }
353 
354 #ifdef USE_READ_SERIAL
355 
376 void DmrppArray::insert_chunk_serial(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
377  Chunk *chunk)
378 {
379  BESDEBUG("dmrpp", __func__ << " dim: "<< dim << " BEGIN "<< endl);
380 
381  // The size, in elements, of each of the chunk's dimensions.
382  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
383 
384  // The chunk's origin point a.k.a. its "position in array".
385  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
386 
387  dimension thisDim = this->get_dimension(dim);
388 
389  // Do we even want this chunk?
390  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
391  return; // No. No, we do not. Skip this.
392  }
393 
394  // What's the first element that we are going to access for this dimension of the chunk?
395  unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
396 
397  // Is the next point to be sent in this chunk at all? If no, return.
398  if (first_element_offset > chunk_shape[dim]) {
399  return;
400  }
401 
402  // Now we figure out the correct last element, based on the subset expression
403  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
404  if ((unsigned) thisDim.stop < end_element) {
405  end_element = thisDim.stop;
406  }
407 
408  unsigned long long chunk_start = first_element_offset; //start_element - chunk_origin[dim];
409  unsigned long long chunk_end = end_element - chunk_origin[dim];
410  vector<unsigned int> constrained_array_shape = get_shape(true);
411 
412  unsigned int last_dim = chunk_shape.size() - 1;
413  if (dim == last_dim) {
414  // Read and Process chunk
415  chunk->read_chunk();
416 
417  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
418 
419  char *source_buffer = chunk->get_rbuf();
420  char *target_buffer = get_buf();
421  unsigned int elem_width = prototype()->width();
422 
423  if (thisDim.stride == 1) {
424  // The start element in this array
425  unsigned long long start_element = chunk_origin[dim] + first_element_offset;
426  // Compute how much we are going to copy
427  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
428 
429  // Compute where we need to put it.
430  (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
431  // Compute where we are going to read it from
432  (*chunk_element_address)[dim] = first_element_offset;
433 
434  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
435  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
436 
437  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
438  }
439  else {
440  // Stride != 1
441  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
442  // Compute where we need to put it.
443  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
444 
445  // Compute where we are going to read it from
446  (*chunk_element_address)[dim] = chunk_index;
447 
448  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
449  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
450 
451  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
452  }
453  }
454  }
455  else {
456  // Not the last dimension, so we continue to proceed down the Recursion Branch.
457  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
458  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
459  (*chunk_element_address)[dim] = chunk_index;
460 
461  // Re-entry here:
462  insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
463  }
464  }
465 }
466 
467 void DmrppArray::read_chunks_serial()
468 {
469  BESDEBUG("dmrpp", __func__ << " for variable '" << name() << "' - BEGIN" << endl);
470 
471  vector<Chunk> &chunk_refs = get_chunk_vec();
472  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
473 
474  // Allocate target memory.
475  reserve_value_capacity(get_size(true));
476 
477  /*
478  * Find the chunks to be read, make curl_easy handles for them, and
479  * stuff them into our curl_multi handle. This is a recursive activity
480  * which utilizes the same code that copies the data from the chunk to
481  * the variables.
482  */
483  for (unsigned long i = 0; i < chunk_refs.size(); i++) {
484  Chunk &chunk = chunk_refs[i];
485 
486  vector<unsigned int> chunk_source_address(dimensions(), 0);
487  vector<unsigned int> target_element_address = chunk.get_position_in_array();
488 
489  // Recursive insertion operation.
490  insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
491  }
492 
493  set_read_p(true);
494 
495  BESDEBUG("dmrpp", "DmrppArray::"<< __func__ << "() for " << name() << " END"<< endl);
496 }
497 #endif
498 
520 Chunk *
521 DmrppArray::find_needed_chunks(unsigned int dim, vector<unsigned int> *target_element_address, Chunk *chunk)
522 {
523  BESDEBUG(dmrpp_3, __func__ << " BEGIN, dim: " << dim << endl);
524 
525  // The size, in elements, of each of the chunk's dimensions.
526  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
527 
528  // The chunk's origin point a.k.a. its "position in array".
529  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
530 
531  dimension thisDim = this->get_dimension(dim);
532 
533  // Do we even want this chunk?
534  if ((unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (unsigned) thisDim.stop < chunk_origin[dim]) {
535  return 0; // No. No, we do not. Skip this chunk.
536  }
537 
538  // What's the first element that we are going to access for this dimension of the chunk?
539  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
540 
541  // Is the next point to be sent in this chunk at all? If no, return.
542  if (chunk_start > chunk_shape[dim]) {
543  return 0;
544  }
545 
546  // Now we figure out the correct last element, based on the subset expression
547  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
548  if ((unsigned) thisDim.stop < end_element) {
549  end_element = thisDim.stop;
550  }
551 
552  unsigned long long chunk_end = end_element - chunk_origin[dim];
553 
554  unsigned int last_dim = chunk_shape.size() - 1;
555  if (dim == last_dim) {
556  return chunk;
557  }
558  else {
559  // Not the last dimension, so we continue to proceed down the Recursion Branch.
560  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
561  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
562 
563  // Re-entry here:
564  Chunk *needed = find_needed_chunks(dim + 1, target_element_address, chunk);
565  if (needed) return needed;
566  }
567  }
568 
569  return 0;
570 }
571 
591 void DmrppArray::insert_chunk(unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
592  Chunk *chunk, const vector<unsigned int> &constrained_array_shape)
593 {
594  // The size, in elements, of each of the chunk's dimensions.
595  const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
596 
597  // The chunk's origin point a.k.a. its "position in array".
598  const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
599 
600  dimension thisDim = this->get_dimension(dim);
601 
602  // What's the first element that we are going to access for this dimension of the chunk?
603  unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
604 
605  // Now we figure out the correct last element, based on the subset expression
606  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
607  if ((unsigned) thisDim.stop < end_element) {
608  end_element = thisDim.stop;
609  }
610 
611  unsigned long long chunk_end = end_element - chunk_origin[dim];
612 
613  unsigned int last_dim = chunk_shape.size() - 1;
614  if (dim == last_dim) {
615  char *source_buffer = chunk->get_rbuf();
616  char *target_buffer = get_buf();
617  unsigned int elem_width = prototype()->width();
618 
619  if (thisDim.stride == 1) {
620  // The start element in this array
621  unsigned long long start_element = chunk_origin[dim] + chunk_start;
622  // Compute how much we are going to copy
623  unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
624 
625  // Compute where we need to put it.
626  (*target_element_address)[dim] = (start_element - thisDim.start); // / thisDim.stride;
627  // Compute where we are going to read it from
628  (*chunk_element_address)[dim] = chunk_start;
629 
630  // See below re get_index()
631  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
632  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
633 
634  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
635  }
636  else {
637  // Stride != 1
638  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
639  // Compute where we need to put it.
640  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
641 
642  // Compute where we are going to read it from
643  (*chunk_element_address)[dim] = chunk_index;
644 
645  // These calls to get_index() can be removed as with the insert...unconstrained() code.
646  unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
647  unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
648 
649  memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
650  }
651  }
652  }
653  else {
654  // Not the last dimension, so we continue to proceed down the Recursion Branch.
655  for (unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
656  (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
657  (*chunk_element_address)[dim] = chunk_index;
658 
659  // Re-entry here:
660  insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
661  }
662  }
663 }
664 
671 void DmrppArray::read_chunks()
672 {
673  vector<Chunk> &chunk_refs = get_chunk_vec();
674  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
675 
676  // Find all the chunks to read. I used a queue to preserve the chunk order, which
677  // made using a debugger easier. However, order does not matter, AFAIK.
678  queue<Chunk *> chunks_to_read;
679 
680  // Look at all the chunks
681  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
682  Chunk &chunk = *c;
683 
684  vector<unsigned int> target_element_address = chunk.get_position_in_array();
685  Chunk *needed = find_needed_chunks(0 /* dimension */, &target_element_address, &chunk);
686  if (needed) chunks_to_read.push(needed);
687  }
688 
689  reserve_value_capacity(get_size(true));
690  vector<unsigned int> constrained_array_shape = get_shape(true);
691 
692  BESDEBUG(dmrpp_3, "d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
693  BESDEBUG(dmrpp_3, "d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
694 
695  if (DmrppRequestHandler::d_use_parallel_transfers) {
696  // This is the parallel version of the code. It reads a set of chunks in parallel
697  // using the multi curl API, then inserts them, then reads the next set, ... jhrg 5/1/18
698  unsigned int max_handles = DmrppRequestHandler::curl_handle_pool->get_max_handles();
699  dmrpp_multi_handle *mhandle = DmrppRequestHandler::curl_handle_pool->get_multi_handle();
700 
701  // Look only at the chunks we need, found above. jhrg 4/30/18
702  while (chunks_to_read.size() > 0) {
703  queue<Chunk*> chunks_to_insert;
704  for (unsigned int i = 0; i < max_handles && chunks_to_read.size() > 0; ++i) {
705  Chunk *chunk = chunks_to_read.front();
706  chunks_to_read.pop();
707 
708  chunk->set_rbuf_to_size();
709  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(chunk);
710  if (!handle) throw BESInternalError("No more libcurl handles.", __FILE__, __LINE__);
711 
712  BESDEBUG(dmrpp_3, "Queuing: " << chunk->to_string() << endl);
713  mhandle->add_easy_handle(handle);
714 
715  chunks_to_insert.push(chunk);
716  }
717 
718  mhandle->read_data(); // read, then remove the easy_handles
719 
720  while (chunks_to_insert.size() > 0) {
721  Chunk *chunk = chunks_to_insert.front();
722  chunks_to_insert.pop();
723 
724  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
725 
726  vector<unsigned int> target_element_address = chunk->get_position_in_array();
727  vector<unsigned int> chunk_source_address(dimensions(), 0);
728 
729  BESDEBUG(dmrpp_3, "Inserting: " << chunk->to_string() << endl);
730  insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
731  }
732  }
733  }
734  else {
735  // This version is the 'serial' version of the code. It reads a chunk, inserts it,
736  // reads the next one, and so on.
737  while (chunks_to_read.size() > 0) {
738  Chunk *chunk = chunks_to_read.front();
739  chunks_to_read.pop();
740 
741  BESDEBUG(dmrpp_3, "Reading: " << chunk->to_string() << endl);
742  chunk->read_chunk();
743 
744  chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
745 
746  vector<unsigned int> target_element_address = chunk->get_position_in_array();
747  vector<unsigned int> chunk_source_address(dimensions(), 0);
748 
749  BESDEBUG(dmrpp_3, "Inserting: " << chunk->to_string() << endl);
750  insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
751  }
752  }
753 
754  set_read_p(true);
755 }
756 
770 static unsigned long multiplier(const vector<unsigned int> &shape, unsigned int k)
771 {
772  assert(shape.size() > 1);
773  assert(shape.size() > k + 1);
774 
775  vector<unsigned int>::const_iterator i = shape.begin(), e = shape.end();
776  advance(i, k + 1);
777  unsigned long multiplier = *i++;
778  while (i != e) {
779  multiplier *= *i++;
780  }
781 
782  return multiplier;
783 }
784 
804 void DmrppArray::insert_chunk_unconstrained(Chunk *chunk, unsigned int dim, unsigned long long array_offset, const vector<unsigned int> &array_shape,
805  unsigned long long chunk_offset, const vector<unsigned int> &chunk_shape, const vector<unsigned int> &chunk_origin)
806 {
807  // Now we figure out the correct last element. It's possible that a
808  // chunk 'extends beyond' the Array bounds. Here 'end_element' is the
809  // last element of the destination array
810  dimension thisDim = this->get_dimension(dim);
811  unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
812  if ((unsigned) thisDim.stop < end_element) {
813  end_element = thisDim.stop;
814  }
815 
816  unsigned long long chunk_end = end_element - chunk_origin[dim];
817 
818  unsigned int last_dim = chunk_shape.size() - 1;
819  if (dim == last_dim) {
820  unsigned int elem_width = prototype()->width();
821 
822  array_offset += chunk_origin[dim];
823 
824  // Compute how much we are going to copy
825  unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
826  char *source_buffer = chunk->get_rbuf();
827  char *target_buffer = get_buf();
828  memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
829  }
830  else {
831  unsigned long mc = multiplier(chunk_shape, dim);
832  unsigned long ma = multiplier(array_shape, dim);
833 
834  // Not the last dimension, so we continue to proceed down the Recursion Branch.
835  for (unsigned int chunk_index = 0 /*chunk_start*/; chunk_index <= chunk_end; ++chunk_index) {
836  unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
837  unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
838 
839  // Re-entry here:
840  insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape, chunk_origin);
841  }
842  }
843 }
844 
853 void process_one_chunk_unconstrained(Chunk *chunk, DmrppArray *array, const vector<unsigned int> &array_shape,
854  const vector<unsigned int> &chunk_shape)
855 {
856  chunk->read_chunk();
857 
858  if (array->is_deflate_compression() || array->is_shuffle_compression())
859  chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(), array->get_chunk_size_in_elements(),
860  array->var()->width());
861 
862  array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
863 }
864 
870 void *one_chunk_unconstrained_thread(void *arg_list)
871 {
872  one_chunk_unconstrained_args *args = reinterpret_cast<one_chunk_unconstrained_args*>(arg_list);
873 
874  try {
875  process_one_chunk_unconstrained(args->chunk, args->array, args->array_shape, args->chunk_shape);
876  }
877  catch (BESError &error) {
878  write(args->fds[1], &args->tid, sizeof(args->tid));
879  delete args;
880  pthread_exit(new string(error.get_verbose_message()));
881  }
882 
883  // tid is a char and thus us written atomically. Writing this tells the parent
884  // thread the child is complete and it should call pthread_join(tid, ...)
885  write(args->fds[1], &args->tid, sizeof(args->tid));
886 
887  delete args;
888  pthread_exit(NULL);
889 }
890 
891 void DmrppArray::read_chunks_unconstrained()
892 {
893  vector<Chunk> &chunk_refs = get_chunk_vec();
894  if (chunk_refs.size() == 0) throw BESInternalError(string("Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
895 
896  reserve_value_capacity(get_size());
897 
898  // The size in element of each of the array's dimensions
899  const vector<unsigned int> array_shape = get_shape(true);
900  // The size, in elements, of each of the chunk's dimensions
901  const vector<unsigned int> chunk_shape = get_chunk_dimension_sizes();
902 
903  BESDEBUG(dmrpp_3, __func__ << endl);
904 
905  BESDEBUG(dmrpp_3, "d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
906  BESDEBUG(dmrpp_3, "d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
907 
908  if (DmrppRequestHandler::d_use_parallel_transfers) {
909  queue<Chunk *> chunks_to_read;
910 
911  // Queue all of the chunks
912  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c)
913  chunks_to_read.push(&(*c));
914 
915  // This pipe is used by the child threads to indicate completion
916  int fds[2];
917  int status = pipe(fds);
918  if (status < 0)
919  throw BESInternalError(string("Could not open a pipe for thread communication: ").append(strerror(errno)), __FILE__, __LINE__);
920 
921  // Start the max number of processing pipelines
922  pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
923 
924  // set the thread[] elements to null - this serves as a sentinel value
925  for (unsigned int i = 0; i < (unsigned int)DmrppRequestHandler::d_max_parallel_transfers; ++i) {
926  memset(&threads[i], 0, sizeof(pthread_t));
927  }
928 
929  try {
930  unsigned int num_threads = 0;
931  for (unsigned int i = 0; i < (unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
932  Chunk *chunk = chunks_to_read.front();
933  chunks_to_read.pop();
934 
935  // thread number is 'i'
936  one_chunk_unconstrained_args *args = new one_chunk_unconstrained_args(fds, i, chunk, this, array_shape, chunk_shape);
937  int status = pthread_create(&threads[i], NULL, dmrpp::one_chunk_unconstrained_thread, (void*) args);
938  if (status == 0) {
939  ++num_threads;
940  BESDEBUG(dmrpp_3, "started thread: " << i << endl);
941  }
942  else {
943  ostringstream oss("Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
944  oss << i << ": " << strerror(status);
945  BESDEBUG(dmrpp_3, oss.str());
946  throw BESInternalError(oss.str(), __FILE__, __LINE__);
947  }
948  }
949 
950  // Now join the child threads, creating replacement threads if needed
951  while (num_threads > 0) {
952  unsigned char tid; // bytes can be written atomically
953  // Block here until a child thread writes to the pipe, then read the byte
954  int bytes = ::read(fds[0], &tid, sizeof(tid));
955  if (bytes != sizeof(tid))
956  throw BESInternalError(string("Could not read the thread id: ").append(strerror(errno)), __FILE__, __LINE__);
957 
958  if (!(tid < DmrppRequestHandler::d_max_parallel_transfers)) {
959  ostringstream oss("Invalid thread id read after thread exit: ", std::ios::ate);
960  oss << tid;
961  throw BESInternalError(oss.str(), __FILE__, __LINE__);
962  }
963 
964  string *error;
965  int status = pthread_join(threads[tid], (void**)&error);
966  --num_threads;
967  BESDEBUG(dmrpp_3, "joined thread: " << (unsigned int)tid << ", there are: " << num_threads << endl);
968 
969  if (status != 0) {
970  ostringstream oss("Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
971  oss << tid << ": " << strerror(status);
972  throw BESInternalError(oss.str(), __FILE__, __LINE__);
973  }
974  else if (error != 0) {
975  BESInternalError e(*error, __FILE__, __LINE__);
976  delete error;
977  throw e;
978  }
979  else if (chunks_to_read.size() > 0) {
980  Chunk *chunk = chunks_to_read.front();
981  chunks_to_read.pop();
982 
983  // thread number is 'tid,' the number of the thread that just completed
984  one_chunk_unconstrained_args *args = new one_chunk_unconstrained_args(fds, tid, chunk, this, array_shape, chunk_shape);
985  int status = pthread_create(&threads[tid], NULL, dmrpp::one_chunk_unconstrained_thread, (void*) args);
986  if (status != 0) {
987  ostringstream oss;
988  oss << "Could not start process_one_chunk_unconstrained thread for chunk " << tid << ": " << strerror(status);
989  throw BESInternalError(oss.str(), __FILE__, __LINE__);
990  }
991  ++num_threads;
992  BESDEBUG(dmrpp_3, "started thread: " << (unsigned int)tid << ", there are: " << threads << endl);
993  }
994  }
995 
996  // Once done with the threads, close the communication pipe.
997  close(fds[0]);
998  close(fds[1]);
999  }
1000  catch (...) {
1001  // cancel all the threads, otherwise we'll have threads out there using up resources
1002  // defined in DmrppCommon.cc
1003  join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
1004  // close the pipe used to communicate with the child threads
1005  close(fds[0]);
1006  close(fds[1]);
1007  // re-throw the exception
1008  throw;
1009  }
1010 
1011 #if 0
1012  while (chunks_to_read.size() > 0) {
1013 
1014  // Start the max number of processing pipelines
1015  pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
1016  unsigned int threads = 0;
1017  for (unsigned int i = 0; i < (unsigned int) DmrppRequestHandler::d_max_parallel_transfers && chunks_to_read.size() > 0; ++i) {
1018  Chunk *chunk = chunks_to_read.front();
1019  chunks_to_read.pop();
1020 
1021  one_chunk_unconstrained_args *args = new one_chunk_unconstrained_args(fds, i, chunk, this, array_shape, chunk_shape);
1022 
1023  int status = pthread_create(&threads[i], NULL, dmrpp::one_chunk_unconstrained_thread, (void*) args);
1024  if (status == 0) {
1025  ++threads;
1026  }
1027  else {
1028  ostringstream oss("Could not start process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1029  oss << i << ": " << strerror(status);
1030  throw BESInternalError(oss.str(), __FILE__, __LINE__);
1031  }
1032  }
1033  // Now join the child threads.
1034  for (unsigned int i = 0; i < threads; ++i) {
1035  string *error;
1036  int status = pthread_join(threads[i], (void**) &error);
1037  if (status != 0) {
1038  ostringstream oss("Could not join process_one_chunk_unconstrained thread for chunk ", std::ios::ate);
1039  oss << i << ": " << strerror(status);
1040  throw BESInternalError(oss.str(), __FILE__, __LINE__);
1041  }
1042  else if (error != 0) {
1043  BESInternalError e(*error, __FILE__, __LINE__);
1044  delete error;
1045  throw e;
1046  }
1047  }
1048  }
1049 #endif
1050  }
1051  else { // Serial transfers
1052  for (vector<Chunk>::iterator c = chunk_refs.begin(), e = chunk_refs.end(); c != e; ++c) {
1053  Chunk &chunk = *c;
1054  process_one_chunk_unconstrained(&chunk, this, array_shape, chunk_shape);
1055  }
1056  }
1057 
1058  set_read_p(true);
1059 }
1060 
1073 {
1074  if (read_p()) return true;
1075 
1076  // Single chunk and 'contiguous' are the same for this code.
1077 
1078  if (get_immutable_chunks().size() == 1 || get_chunk_dimension_sizes().empty()) {
1079  BESDEBUG(dmrpp_4, "Calling read_contiguous() for " << name() << endl);
1080  read_contiguous(); // Throws on various errors
1081  }
1082  else { // Handle the more complex case where the data is chunked.
1083  if (!is_projected()) {
1084  BESDEBUG(dmrpp_4, "Calling read_chunks_unconstrained() for " << name() << endl);
1085  read_chunks_unconstrained();
1086  }
1087  else {
1088  BESDEBUG(dmrpp_4, "Calling read_chunks() for " << name() << endl);
1089  read_chunks();
1090  }
1091  }
1092 
1093  return true;
1094 }
1095 
1099 class PrintD4ArrayDimXMLWriter: public unary_function<Array::dimension&, void> {
1101  XMLWriter &xml;
1102  // Was this variable constrained using local/direct slicing? i.e., is d_local_constraint set?
1103  // If so, don't use shared dimensions; instead emit Dim elements that are anonymous.
1104  bool d_constrained;
1105 public:
1106 
1107  PrintD4ArrayDimXMLWriter(XMLWriter &xml, bool c) :
1108  xml(xml), d_constrained(c)
1109  {
1110  }
1111 
1112  void operator()(Array::dimension &d)
1113  {
1114  // This duplicates code in D4Dimensions (where D4Dimension::print_dap4() is defined
1115  // because of the need to print the constrained size of a dimension. I think that
1116  // the constraint information has to be kept here and not in the dimension (since they
1117  // are shared dims). Could hack print_dap4() to take the constrained size, however.
1118  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar*) "Dim") < 0)
1119  throw InternalErr(__FILE__, __LINE__, "Could not write Dim element");
1120 
1121  string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1122  // If there is a name, there must be a Dimension (named dimension) in scope
1123  // so write its name but not its size.
1124  if (!d_constrained && !name.empty()) {
1125  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "name", (const xmlChar*) name.c_str()) < 0)
1126  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1127  }
1128  else if (d.use_sdim_for_slice) {
1129  assert(!name.empty());
1130  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "name", (const xmlChar*) name.c_str()) < 0)
1131  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1132  }
1133  else {
1134  ostringstream size;
1135  size << (d_constrained ? d.c_size : d.size);
1136  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "size", (const xmlChar*) size.str().c_str()) < 0)
1137  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1138  }
1139 
1140  if (xmlTextWriterEndElement(xml.get_writer()) < 0) throw InternalErr(__FILE__, __LINE__, "Could not end Dim element");
1141  }
1142 };
1143 
1144 class PrintD4ConstructorVarXMLWriter: public unary_function<BaseType*, void> {
1145  XMLWriter &xml;
1146  bool d_constrained;
1147 public:
1148  PrintD4ConstructorVarXMLWriter(XMLWriter &xml, bool c) :
1149  xml(xml), d_constrained(c)
1150  {
1151  }
1152 
1153  void operator()(BaseType *btp)
1154  {
1155  btp->print_dap4(xml, d_constrained);
1156  }
1157 };
1158 
1159 class PrintD4MapXMLWriter: public unary_function<D4Map*, void> {
1160  XMLWriter &xml;
1161 
1162 public:
1163  PrintD4MapXMLWriter(XMLWriter &xml) :
1164  xml(xml)
1165  {
1166  }
1167 
1168  void operator()(D4Map *m)
1169  {
1170  m->print_dap4(xml);
1171  }
1172 };
1174 
1198 void DmrppArray::print_dap4(XMLWriter &xml, bool constrained /*false*/)
1199 {
1200  if (constrained && !send_p()) return;
1201 
1202  if (xmlTextWriterStartElement(xml.get_writer(), (const xmlChar*) var()->type_name().c_str()) < 0)
1203  throw InternalErr(__FILE__, __LINE__, "Could not write " + type_name() + " element");
1204 
1205  if (!name().empty())
1206  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "name", (const xmlChar*) name().c_str()) < 0)
1207  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for name");
1208 
1209  // Hack job... Copied from D4Enum::print_xml_writer. jhrg 11/12/13
1210  if (var()->type() == dods_enum_c) {
1211  D4Enum *e = static_cast<D4Enum*>(var());
1212  string path = e->enumeration()->name();
1213  if (e->enumeration()->parent()) {
1214  // print the FQN for the enum def; D4Group::FQN() includes the trailing '/'
1215  path = static_cast<D4Group*>(e->enumeration()->parent()->parent())->FQN() + path;
1216  }
1217  if (xmlTextWriterWriteAttribute(xml.get_writer(), (const xmlChar*) "enum", (const xmlChar*) path.c_str()) < 0)
1218  throw InternalErr(__FILE__, __LINE__, "Could not write attribute for enum");
1219  }
1220 
1221  if (prototype()->is_constructor_type()) {
1222  Constructor &c = static_cast<Constructor&>(*prototype());
1223  for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1224  // bind2nd(mem_fun_ref(&BaseType::print_dap4), xml));
1225  }
1226 
1227  // Drop the local_constraint which is per-array and use a per-dimension on instead
1228  for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1229 
1230  attributes()->print_dap4(xml);
1231 
1232  for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1233 
1234  // Only print the chunks info if there. This is the code added to libdap::Array::print_dap4().
1235  // jhrg 5/10/18
1236  if (DmrppCommon::d_print_chunks && get_immutable_chunks().size() > 0) print_chunks_element(xml, DmrppCommon::d_ns_prefix);
1237 
1238  if (xmlTextWriterEndElement(xml.get_writer()) < 0) throw InternalErr(__FILE__, __LINE__, "Could not end " + type_name() + " element");
1239 }
1240 
1241 void DmrppArray::dump(ostream & strm) const
1242 {
1243  strm << BESIndent::LMarg << "DmrppArray::" << __func__ << "(" << (void *) this << ")" << endl;
1244  BESIndent::Indent();
1245  DmrppCommon::dump(strm);
1246  Array::dump(strm);
1247  strm << BESIndent::LMarg << "value: " << "----" << /*d_buf <<*/endl;
1248  BESIndent::UnIndent();
1249 }
1250 
1251 } // namespace dmrpp
virtual bool read()
Read data for the array.
Definition: DmrppArray.cc:1072
exception thrown if inernal error encountered
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:117
void print_chunks_element(libdap::XMLWriter &xml, const std::string &name_space="")
Print the Chunk information.
Definition: DmrppCommon.cc:205
virtual void print_dap4(libdap::XMLWriter &writer, bool constrained=false)
Shadow libdap::Array::print_dap4() - optionally prints DMR++ chunk information.
Definition: DmrppArray.cc:1198
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
virtual std::vector< unsigned int > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:176
virtual unsigned long long get_size(bool constrained=false)
Return the total number of elements in this Array.
Definition: DmrppArray.cc:160
virtual std::vector< Chunk > & get_chunk_vec()
Returns a reference to the internal Chunk vector.
Definition: DmrppCommon.h:82
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.
Definition: DmrppCommon.h:107
static bool d_print_chunks
if true, print_dap4() prints chunk elements
Definition: DmrppCommon.h:89
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:139
static string d_ns_prefix
The XML namespace prefix to use.
Definition: DmrppCommon.h:91