XRootD
Loading...
Searching...
No Matches
XrdEc::block_t Struct Reference
Collaboration diagram for XrdEc::block_t:

Public Types

typedef std::tuple< uint64_t, uint32_t, char *, callback_targs_t
typedef std::vector< args_tpending_t
enum  state_t {
  Empty = 0 ,
  Loading ,
  Valid ,
  Missing ,
  Recovering
}

Public Member Functions

 block_t (size_t blkid, Reader &reader, ObjCfg &objcfg)
void carryout (pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())
void fail_missing ()
stripes_t get_stripes ()

Static Public Member Functions

static bool error_correction (std::shared_ptr< block_t > &self)
static void read (std::shared_ptr< block_t > &self, size_t strpid, uint64_t offset, uint32_t size, char *usrbuff, callback_t usrcb, uint16_t timeout)
static callback_t read_callback (std::shared_ptr< block_t > &self, size_t strpid)

Public Attributes

size_t blkid
std::mutex mtx
ObjCfgobjcfg
std::vector< pending_tpending
Readerreader
bool recovering
std::vector< state_tstate
std::vector< buffer_tstripes

Detailed Description

Definition at line 117 of file XrdEcReader.cc.

Member Typedef Documentation

◆ args_t

typedef std::tuple<uint64_t, uint32_t, char*, callback_t> XrdEc::block_t::args_t

Definition at line 119 of file XrdEcReader.cc.

◆ pending_t

typedef std::vector<args_t> XrdEc::block_t::pending_t

Definition at line 120 of file XrdEcReader.cc.

Member Enumeration Documentation

◆ state_t

Enumerator
Empty 
Loading 
Valid 
Missing 
Recovering 

Definition at line 125 of file XrdEcReader.cc.

Constructor & Destructor Documentation

◆ block_t()

XrdEc::block_t::block_t ( size_t blkid,
Reader & reader,
ObjCfg & objcfg )
inline

Definition at line 130 of file XrdEcReader.cc.

130 : reader( reader ),
131 objcfg( objcfg ),
132 stripes( objcfg.nbchunks ),
133 state( objcfg.nbchunks, Empty ),
134 pending( objcfg.nbchunks ),
135 blkid( blkid ),
136 recovering( 0 )
137 {
138 }
std::vector< state_t > state
std::vector< buffer_t > stripes
std::vector< pending_t > pending

References blkid, Empty, objcfg, pending, reader, recovering, state, and stripes.

Member Function Documentation

◆ carryout()

void XrdEc::block_t::carryout ( pending_t & pending,
const buffer_t & stripe,
const XrdCl::XRootDStatus & st = XrdCl::XRootDStatus() )
inline

Definition at line 357 of file XrdEcReader.cc.

360 {
361 //-----------------------------------------------------------------------
362 // Iterate over all pending read operations for given stripe
363 //-----------------------------------------------------------------------
364 auto itr = pending.begin();
365 for( ; itr != pending.end() ; ++itr )
366 {
367 auto &args = *itr;
368 callback_t &callback = std::get<3>( args );
369 uint32_t nbrd = 0; // number of bytes read
370 //---------------------------------------------------------------------
371 // If the read was successful, copy the data to user buffer
372 //---------------------------------------------------------------------
373 if( st.IsOK() )
374 {
375 uint64_t offset = std::get<0>( args );
376 uint32_t size = std::get<1>( args );
377 char *usrbuff = std::get<2>( args );
378 // are we reading past the end of file?
379 if( offset > stripe.size() )
380 size = 0;
381 // are partially reading past the end of the file?
382 else if( offset + size > stripe.size() )
383 size = stripe.size() - offset;
384 if(usrbuff)
385 memcpy( usrbuff, stripe.data() + offset, size );
386 nbrd = size;
387 }
388 //---------------------------------------------------------------------
389 // Call the user callback
390 //---------------------------------------------------------------------
391 callback( st, nbrd );
392 }
393 //-----------------------------------------------------------------------
394 // Now we can clear the pending reads
395 //-----------------------------------------------------------------------
396 pending.clear();
397 }
std::function< void(const XrdCl::XRootDStatus &, uint32_t)> callback_t
bool IsOK() const
We're fine.

References pending.

Referenced by fail_missing().

Here is the caller graph for this function:

◆ error_correction()

bool XrdEc::block_t::error_correction ( std::shared_ptr< block_t > & self)
inlinestatic

Definition at line 222 of file XrdEcReader.cc.

223 {
224 //---------------------------------------------------------------------
225 // Do the accounting for our stripes
226 //---------------------------------------------------------------------
227 size_t missingcnt = 0, validcnt = 0, loadingcnt = 0, recoveringcnt = 0;
228 std::for_each( self->state.begin(), self->state.end(), [&]( state_t &s )
229 {
230 switch( s )
231 {
232 case Missing: ++missingcnt; break;
233 case Valid: ++validcnt; break;
234 case Loading: ++loadingcnt; break;
235 case Recovering: ++recoveringcnt; break;
236 default: ;
237 }
238 } );
239 //---------------------------------------------------------------------
240 // If there are no missing stripes all is good ...
241 //---------------------------------------------------------------------
242 if( missingcnt + recoveringcnt == 0 ) return true;
243 //---------------------------------------------------------------------
244 // Check if we can do the recovery at all (if too many stripes are
245 // missing it won't be possible)
246 //---------------------------------------------------------------------
247 if( missingcnt + recoveringcnt > self->objcfg.nbparity )
248 {
249 std::for_each( self->state.begin(), self->state.end(),
250 []( state_t &s ){ if( s == Recovering ) s = Missing; } );
251 return false;
252 }
253 //---------------------------------------------------------------------
254 // Check if we can do the recovery right away
255 //---------------------------------------------------------------------
256 if( validcnt >= self->objcfg.nbdata )
257 {
258 Config &cfg = Config::Instance();
259 stripes_t strps( self->get_stripes() );
260 try
261 {
262 cfg.GetRedundancy( self->objcfg ).compute( strps );
263 }
264 catch( const IOError &ex )
265 {
266 std::for_each( self->state.begin(), self->state.end(),
267 []( state_t &s ){ if( s == Recovering ) s = Missing; } );
268 return false;
269 }
270 //-------------------------------------------------------------------
271 // Now when we recovered the data we need to mark every stripe as
272 // valid and execute the pending reads
273 //-------------------------------------------------------------------
274 for( size_t strpid = 0; strpid < self->objcfg.nbchunks; ++strpid )
275 {
276 if( self->state[strpid] != Recovering ) continue;
277 self->state[strpid] = Valid;
278 self->carryout( self->pending[strpid], self->stripes[strpid] );
279 }
280 return true;
281 }
282 //---------------------------------------------------------------------
283 // Try loading the data and only then attempt recovery
284 //---------------------------------------------------------------------
285 size_t i = 0;
286 while( loadingcnt + validcnt < self->objcfg.nbdata && i < self->objcfg.nbchunks )
287 {
288 size_t strpid = i++;
289 if( self->state[strpid] != Empty ) continue;
290 self->reader.Read( self->blkid, strpid, self->stripes[strpid],
291 read_callback( self, strpid ) );
292 self->state[strpid] = Loading;
293 ++loadingcnt;
294 }
295
296 //-------------------------------------------------------------------
297 // Now that we triggered the recovery procedure mark every missing
298 // stripe as recovering.
299 //-------------------------------------------------------------------
300 std::for_each( self->state.begin(), self->state.end(),
301 []( state_t &s ){ if( s == Missing ) s = Recovering; } );
302 return true;
303 }
static Config & Instance()
Singleton access.
XrdCmsConfig Config
std::vector< stripe_t > stripes_t
All stripes in a block.
static callback_t read_callback(std::shared_ptr< block_t > &self, size_t strpid)

Referenced by read(), and read_callback().

Here is the caller graph for this function:

◆ fail_missing()

void XrdEc::block_t::fail_missing ( )
inline

Definition at line 402 of file XrdEcReader.cc.

403 {
404 size_t size = objcfg.nbchunks;
405 for( size_t i = 0; i < size; ++i )
406 {
407 if( state[i] != Missing ) continue;
408 carryout( pending[i], stripes[i],
409 XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ) );
410 }
411 }
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
void carryout(pending_t &pending, const buffer_t &stripe, const XrdCl::XRootDStatus &st=XrdCl::XRootDStatus())

References carryout(), XrdCl::errDataError, Missing, objcfg, pending, state, XrdCl::stError, and stripes.

Here is the call graph for this function:

◆ get_stripes()

stripes_t XrdEc::block_t::get_stripes ( )
inline

Definition at line 337 of file XrdEcReader.cc.

338 {
339 stripes_t ret;
340 ret.reserve( objcfg.nbchunks );
341 for( size_t i = 0; i < objcfg.nbchunks; ++i )
342 {
343 if( state[i] == Valid )
344 ret.emplace_back( stripes[i].data(), true );
345 else
346 {
347 stripes[i].resize( objcfg.chunksize, 0 );
348 ret.emplace_back( stripes[i].data(), false );
349 }
350 }
351 return ret;
352 }

References objcfg, state, stripes, and Valid.

◆ read()

void XrdEc::block_t::read ( std::shared_ptr< block_t > & self,
size_t strpid,
uint64_t offset,
uint32_t size,
char * usrbuff,
callback_t usrcb,
uint16_t timeout )
inlinestatic

Definition at line 152 of file XrdEcReader.cc.

159 {
160 std::unique_lock<std::mutex> lck( self->mtx );
161
162 //---------------------------------------------------------------------
163 // The cache is empty, we need to load the data
164 //---------------------------------------------------------------------
165 if( self->state[strpid] == Empty )
166 {
167 self->reader.Read( self->blkid, strpid, self->stripes[strpid],
168 read_callback( self, strpid ), timeout );
169 self->state[strpid] = Loading;
170 }
171 //---------------------------------------------------------------------
172 // The stripe is either corrupted or unreachable
173 //---------------------------------------------------------------------
174 if( self->state[strpid] == Missing )
175 {
176 if( !error_correction( self ) )
177 {
178 //-----------------------------------------------------------------
179 // Recovery was not possible, notify the user of the error
180 //-----------------------------------------------------------------
181 usrcb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError ), 0 );
182 return;
183 }
184 //-------------------------------------------------------------------
185 // we fall through to the following if-statements that will handle
186 // Recovering / Valid state
187 //-------------------------------------------------------------------
188 }
189 //---------------------------------------------------------------------
190 // The cache is loading or recovering, we don't have the data yet
191 //---------------------------------------------------------------------
192 if( self->state[strpid] == Loading || self->state[strpid] == Recovering )
193 {
194 self->pending[strpid].emplace_back( offset, size, usrbuff, usrcb );
195 return;
196 }
197 //---------------------------------------------------------------------
198 // We do have the data so we can serve the user right away
199 //---------------------------------------------------------------------
200 if( self->state[strpid] == Valid )
201 {
202 if( offset + size > self->stripes[strpid].size() )
203 size = self->stripes[strpid].size() - offset;
204 if(usrbuff)
205 memcpy( usrbuff, self->stripes[strpid].data() + offset, size );
206 usrcb( XrdCl::XRootDStatus(), size );
207 return;
208 }
209 //---------------------------------------------------------------------
210 // In principle we should never end up here, nevertheless if this
211 // happens it is clearly an error ...
212 //---------------------------------------------------------------------
213 usrcb( XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidOp ), 0 );
214 }
const uint16_t errInvalidOp
static bool error_correction(std::shared_ptr< block_t > &self)

References Empty, XrdCl::errDataError, XrdCl::errInvalidOp, error_correction(), Loading, Missing, read_callback(), Recovering, XrdCl::stError, and Valid.

Referenced by XrdEc::Reader::Read().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ read_callback()

callback_t XrdEc::block_t::read_callback ( std::shared_ptr< block_t > & self,
size_t strpid )
inlinestatic

Definition at line 309 of file XrdEcReader.cc.

310 {
311 return [self, strpid]( const XrdCl::XRootDStatus &st, uint32_t ) mutable
312 {
313 std::unique_lock<std::mutex> lck( self->mtx );
314 self->state[strpid] = st.IsOK() ? Valid : Missing;
315 //------------------------------------------------------------
316 // Check if we need to do any error correction (either for
317 // the current stripe, or any other stripe)
318 //------------------------------------------------------------
319 bool recoverable = error_correction( self );
320 //------------------------------------------------------------
321 // Carry out the pending read requests if we got the data
322 //------------------------------------------------------------
323 if( st.IsOK() )
324 self->carryout( self->pending[strpid], self->stripes[strpid], st );
325 //------------------------------------------------------------
326 // Carry out the pending read requests if there was an error
327 // and we cannot recover
328 //------------------------------------------------------------
329 if( !recoverable )
330 self->fail_missing();
331 };
332 }

References error_correction(), Missing, and Valid.

Referenced by read().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Data Documentation

◆ blkid

size_t XrdEc::block_t::blkid

Definition at line 418 of file XrdEcReader.cc.

Referenced by block_t().

◆ mtx

std::mutex XrdEc::block_t::mtx

Definition at line 420 of file XrdEcReader.cc.

◆ objcfg

ObjCfg& XrdEc::block_t::objcfg

Definition at line 414 of file XrdEcReader.cc.

Referenced by block_t(), fail_missing(), and get_stripes().

◆ pending

std::vector<pending_t> XrdEc::block_t::pending

Definition at line 417 of file XrdEcReader.cc.

Referenced by block_t(), carryout(), and fail_missing().

◆ reader

Reader& XrdEc::block_t::reader

Definition at line 413 of file XrdEcReader.cc.

Referenced by block_t().

◆ recovering

bool XrdEc::block_t::recovering

Definition at line 419 of file XrdEcReader.cc.

Referenced by block_t().

◆ state

std::vector<state_t> XrdEc::block_t::state

Definition at line 416 of file XrdEcReader.cc.

Referenced by block_t(), fail_missing(), and get_stripes().

◆ stripes

std::vector<buffer_t> XrdEc::block_t::stripes

Definition at line 415 of file XrdEcReader.cc.

Referenced by block_t(), fail_missing(), and get_stripes().


The documentation for this struct was generated from the following file: