XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file.
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog ()
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace ()
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor.
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
 
static CacheGetInstance ()
 Singleton access.
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file)
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write)
 
static const int optRW = 0x0004
 File is read/write (o/w read/only)
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 151 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger * logger,
XrdOucEnv * env )

Constructor.

Definition at line 158 of file XrdPfc.cc.

158 :
159 XrdOucCache("pfc"),
160 m_env(env),
161 m_log(logger, "XrdPfc_"),
162 m_trace(new XrdSysTrace("XrdPfc", logger)),
163 m_traceID("Cache"),
164 m_oss(0),
165 m_gstream(0),
166 m_purge_pin(0),
167 m_prefetch_condVar(0),
168 m_prefetch_enabled(false),
169 m_RAM_used(0),
170 m_RAM_write_queue(0),
171 m_RAM_std_size(0),
172 m_isClient(false),
173 m_active_cond(0)
174{
175 // Default log level is Warning.
176 m_trace->What = 2;
177}
XrdOucCache(const char *ctype)

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block * b,
bool from_read )

Add downloaded block in write queue.

Definition at line 221 of file XrdPfc.cc.

222{
223 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
224
225 {
226 XrdSysMutexHelper lock(&m_RAM_mutex);
227 m_RAM_write_queue += b->get_size();
228 }
229
230 m_writeQ.condVar.Lock();
231 if (fromRead)
232 m_writeQ.queue.push_back(b);
233 else
234 m_writeQ.queue.push_front(b);
235 m_writeQ.size++;
236 m_writeQ.condVar.Signal();
237 m_writeQ.condVar.UnLock();
238}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
const std::string & GetLocalPath() const

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdSysCondVar::Lock(), XrdPfc::Block::m_offset, XrdSysCondVar::Signal(), TRACE, and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ Attach()

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO * io,
int Options = 0 )
virtual

Implements XrdOucCache.

Definition at line 179 of file XrdPfc.cc.

180{
181 const char* tpfx = "Attach() ";
182
183 if (Cache::GetInstance().Decide(io))
184 {
185 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
186
187 IO *cio;
188
189 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
190 {
191 cio = new IOFileBlock(io, *this);
192 }
193 else
194 {
195 IOFile *iof = new IOFile(io, *this);
196
197 if ( ! iof->HasFile())
198 {
199 delete iof;
200 // TODO - redirect instead. But this is kind of an awkward place for it.
201 // errno is set during IOFile construction.
202 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
203 return io;
204 }
205
206 cio = iof;
207 }
208
209 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
210 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
211
212 return cio;
213 }
214 else
215 {
216 TRACE(Info, tpfx << "decision decline " << io->Path());
217 }
218 return io;
219}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:204
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:137
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 684 of file XrdPfc.cc.

685{
686 XrdSysCondVarHelper lock(&m_active_cond);
687 m_purge_delay_set.clear();
688}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 134 of file XrdPfc.cc.

134{ return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char * config_filename,
const char * parameters )

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 408 of file XrdPfcConfiguration.cc.

409{
410 // Indicate whether or not we are a client instance
411 const char *theINS = getenv("XRDINSTANCE");
412 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
413
414 // Tell everyone else we are a caching proxy
415 XrdOucEnv::Export("XRDPFC", 1);
416
417 XrdOucEnv myEnv;
418 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
419
420 if (! config_filename || ! *config_filename)
421 {
422 TRACE(Error, "Config() configuration file not specified.");
423 return false;
424 }
425
426 int fd;
427 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
428 {
429 TRACE( Error, "Config() can't open configuration file " << config_filename);
430 return false;
431 }
432
433 Config.Attach(fd);
434 static const char *cvec[] = { "*** pfc plugin config:", 0 };
435 Config.Capture(cvec);
436
437 // Obtain OFS configurator for OSS plugin.
438 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
439 &XrdVERSIONINFOVAR(XrdOucGetCache));
440 if (! ofsCfg) return false;
441
442 TmpConfiguration tmpc;
443
444 // Adjust default parameters for client/serverless caching
445 if (m_isClient)
446 {
447 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
448 m_configuration.m_wqueue_blocks = 8;
449 m_configuration.m_wqueue_threads = 1;
450 }
451
452 // If network checksum processing is the default, indicate so.
453 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
454
455 // Actual parsing of the config file.
456 bool retval = true, aOK = true;
457 char *var;
458 while ((var = Config.GetMyFirstWord()))
459 {
460 if (! strcmp(var,"pfc.osslib"))
461 {
462 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
463 }
464 else if (! strcmp(var,"pfc.cschk"))
465 {
466 retval = xcschk(Config);
467 }
468 else if (! strcmp(var,"pfc.decisionlib"))
469 {
470 retval = xdlib(Config);
471 }
472 else if (! strcmp(var,"pfc.purgelib"))
473 {
474 retval = xplib(Config);
475 }
476 else if (! strcmp(var,"pfc.trace"))
477 {
478 retval = xtrace(Config);
479 }
480 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
481 {
482 m_configuration.m_allow_xrdpfc_command = true;
483 }
484 else if (! strncmp(var,"pfc.", 4))
485 {
486 retval = ConfigParameters(std::string(var+4), Config, tmpc);
487 }
488
489 if ( ! retval)
490 {
491 TRACE(Error, "Config() error in parsing");
492 aOK = false;
493 }
494 }
495
496 Config.Close();
497
498 // Load OSS plugin.
499 myEnv.Put("oss.runmode", "pfc");
500 if (m_configuration.is_cschk_cache())
501 {
502 char csi_conf[128];
503 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
504 {
505 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
506 } else {
507 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
508 return false;
509 }
510 }
511 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
512 {
513 ofsCfg->Plugin(m_oss);
514 }
515 else
516 {
517 TRACE(Error, "Config() Unable to create an OSS object");
518 return false;
519 }
520
521 // Test if OSS is operational, determine optional features.
522 aOK &= test_oss_basics_and_features();
523
524 // sets default value for disk usage
525 XrdOssVSInfo sP;
526 {
527 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
528 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
529 {
530 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
531 return false;
532 }
533 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
534 {
535 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
536 m_configuration.m_meta_space.c_str());
537 return false;
538 }
539 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
540 {
541 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
542 return false;
543 }
544 if (sP.Total < 10ll << 20)
545 {
546 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
547 m_configuration.m_data_space.c_str());
548 return false;
549 }
550
551 m_configuration.m_diskTotalSpace = sP.Total;
552
553 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
554 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
555 {
556 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
557 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
558 aOK = false;
559 }
560 }
561 else aOK = false;
562
563 if ( ! tmpc.m_fileUsageMax.empty())
564 {
565 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
566 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
567 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
568 {
569 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
570 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
571 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
572 {
573 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
574 aOK = false;
575 }
576
577
578 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
579 {
580 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
581 aOK = false;
582 }
583 }
584 else aOK = false;
585 }
586 }
587
588 // sets flush frequency
589 if ( ! tmpc.m_flushRaw.empty())
590 {
591 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
592 {
593 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
594 &m_configuration.m_flushCnt,
595 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
596 {
597 return false;
598 }
599 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
600 }
601 else
602 {
603 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
604 &m_configuration.m_flushCnt, 100, 100000))
605 {
606 return false;
607 }
608 }
609 }
610
611 // get number of available RAM blocks after process configuration
612 if (m_configuration.m_RamAbsAvailable == 0)
613 {
614 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
615 char buff[1024];
616 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
617 m_log.Say("Config info: ", buff);
618 }
619 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
620 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
621
622 // Set tracing to debug if this is set in environment
623 char* cenv = getenv("XRDDEBUG");
624 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
625
626 if (aOK)
627 {
628 int loff = 0;
629// 000 001 010
630 const char *csc[] = {"off", "cache nonet", "nocache net notls",
631// 011
632 "cache net notls",
633// 100 101 110
634 "off", "cache nonet", "nocache net tls",
635// 111
636 "cache net tls"};
637 char buff[8192], uvk[32];
638 if (m_configuration.m_cs_UVKeep < 0)
639 strcpy(uvk, "lru");
640 else
641 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
642 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
643 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
644 " pfc.cschk %s uvkeep %s\n"
645 " pfc.blocksize %lld\n"
646 " pfc.prefetch %d\n"
647 " pfc.ram %.fg\n"
648 " pfc.writequeue %d %d\n"
649 " # Total available disk: %lld\n"
650 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
651 " pfc.spaces %s %s\n"
652 " pfc.trace %d\n"
653 " pfc.flush %lld\n"
654 " pfc.acchistorysize %d\n"
655 " pfc.onlyIfCachedMinBytes %lld\n"
656 " pfc.onlyIfCachedMinFrac %.2f\n",
657 config_filename,
658 csc[int(m_configuration.m_cs_Chk)], uvk,
659 m_configuration.m_bufferSize,
660 m_configuration.m_prefetch_max_blocks,
661 rg,
662 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
663 sP.Total,
664 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
665 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
666 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
667 m_configuration.m_data_space.c_str(),
668 m_configuration.m_meta_space.c_str(),
669 m_trace->What,
670 m_configuration.m_flushCnt,
671 m_configuration.m_accHistorySize,
672 m_configuration.m_onlyIfCachedMinSize,
673 m_configuration.m_onlyIfCachedMinFrac);
674
675 if (m_configuration.is_dir_stat_reporting_on())
676 {
677 loff += snprintf(buff + loff, sizeof(buff) - loff,
678 " pfc.dirstats interval %d maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
679 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
680 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
681 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
682 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
683 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
684 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
685 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
686 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
687 }
688
689 if (m_configuration.m_hdfsmode)
690 {
691 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
692 }
693
694 if (m_configuration.m_username.empty())
695 {
696 char unameBuff[256];
697 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
698 m_configuration.m_username = unameBuff;
699 }
700 else
701 {
702 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
703 }
704
705 m_log.Say(buff);
706
707 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
708 }
709
710 // Derived settings
711 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
712 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
713
714 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
715
716 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
717
718 // Create the ResourceMonitor and get it ready for starting the main thread function.
719 if (aOK)
720 {
721 m_res_mon = new ResourceMonitor(*m_oss);
722 m_res_mon->init_before_main();
723 }
724
725 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
726
727 if (ofsCfg) delete ofsCfg;
728
729 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
730 // Building of xrdpfc_print fails when this is enabled.
731#ifdef XRDPFC_CKSUM_TEST
732 {
733 int xxx = m_configuration.m_cs_Chk;
734
735 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
736 {
737 Info::TestCksumStuff();
738 }
739
740 m_configuration.m_cs_Chk = xxx;
741 }
742#endif
743
744 return aOK;
745}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define open
Definition XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:115
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:109
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:116
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:100
int m_wqueue_threads
number of threads writing blocks to disk
Definition XrdPfc.hh:112
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:106
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:85
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:93
bool is_cschk_cache() const
Definition XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition XrdPfc.hh:103
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:113
bool m_cs_ChkTLS
Allow TLS.
Definition XrdPfc.hh:120
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition XrdPfc.hh:119
bool m_hdfsmode
flag for enabling block-level operation
Definition XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:110
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:108
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition XrdPfc.hh:104
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:111
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
bool is_cschk_net() const
Definition XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:123
time_t m_cs_UVKeep
unverified checksum cache keep
Definition XrdPfc.hh:118
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition XrdPfc.hh:105
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:122
bool is_dir_stat_reporting_on() const
Definition XrdPfc.hh:70
std::string m_diskUsageLWM
Definition XrdPfc.hh:130
std::string m_diskUsageHWM
Definition XrdPfc.hh:131
std::string m_fileUsageBaseline
Definition XrdPfc.hh:132
std::string m_fileUsageNominal
Definition XrdPfc.hh:133
std::string m_flushRaw
Definition XrdPfc.hh:135
std::string m_fileUsageMax
Definition XrdPfc.hh:134

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Error, XrdOucEnv::Export(), XrdOucEnv::GetPtr(), XrdPfc::ResourceMonitor::init_before_main(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsInterval, XrdPfc::Configuration::m_dirStatsMaxDepth, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char * curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 999 of file XrdPfc.cc.

1000{
1001 static const char* tpfx = "ConsiderCached ";
1002
1003 TRACE(Debug, tpfx << curl);
1004
1005 XrdCl::URL url(curl);
1006 std::string f_name = url.GetPath();
1007
1008 File *file = nullptr;
1009 {
1010 XrdSysCondVarHelper lock(&m_active_cond);
1011 auto it = m_active.find(f_name);
1012 if (it != m_active.end()) {
1013 file = it->second;
1014 // If the file-open is in progress, `file` is a nullptr
1015 // so we cannot increase the reference count. For now,
1016 // simply treat it as if the file open doesn't exist instead
1017 // of trying to wait and see if it succeeds.
1018 if (file) {
1019 inc_ref_cnt(file, false, false);
1020 }
1021 }
1022 }
1023 if (file) {
1024 struct stat sbuff;
1025 int res = file->Fstat(sbuff);
1026 dec_ref_cnt(file, false);
1027 if (res)
1028 return res;
1029 // DecideIfConsideredCached() already called in File::Fstat().
1030 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1031 }
1032
1033 struct stat sbuff;
1034 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1035 if (res != XrdOssOK) {
1036 TRACE(Debug, tpfx << curl << " -> " << res);
1037 return res;
1038 }
1039 if (S_ISDIR(sbuff.st_mode))
1040 {
1041 TRACE(Debug, tpfx << curl << " -> EISDIR");
1042 return -EISDIR;
1043 }
1044
1045 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1046 if (file_size < 0) {
1047 TRACE(Debug, tpfx << curl << " -> " << file_size);
1048 return (int) file_size;
1049 }
1050 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1051
1052 return is_cached ? 0 : -EREMOTE;
1053}
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:101
URL representation.
Definition XrdClURL.hh:31
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:925
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:966
int Fstat(struct stat &sbuff)
static const char * s_infoExtension

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), stat, TRACE, and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger * logger,
XrdOucEnv * env )
static

Singleton creation.

Definition at line 125 of file XrdPfc.cc.

126{
127 assert (m_instance == 0);
128 m_instance = new Cache(logger, env);
129 return *m_instance;
130}
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:158

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO * io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 137 of file XrdPfc.cc.

138{
139 if (! m_decisionpoints.empty())
140 {
141 XrdCl::URL url(io->Path());
142 std::string filename = url.GetPath();
143 std::vector<Decision*>::const_iterator it;
144 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145 {
146 XrdPfc::Decision *d = *it;
147 if (! d) continue;
148 if (! d->Decide(filename, *m_oss))
149 {
150 return false;
151 }
152 }
153 }
154
155 return true;
156}
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long file_size,
long long bytes_on_disk )

Definition at line 966 of file XrdPfc.cc.

967{
968 if (file_size == 0 || bytes_on_disk >= file_size)
969 return true;
970
971 double frac_on_disk = (double) bytes_on_disk / file_size;
972
973 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
974 {
975 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
976 return true;
977 }
978 else
979 {
980 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
981 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
982 return true;
983 }
984 return false;
985}

References XrdPfc::Configuration::m_onlyIfCachedMinFrac, and XrdPfc::Configuration::m_onlyIfCachedMinSize.

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File * file)

Definition at line 710 of file XrdPfc.cc.

711{
712 // Can be called with other locks held.
713
714 if ( ! m_prefetch_enabled)
715 {
716 return;
717 }
718
719 m_prefetch_condVar.Lock();
720 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
721 {
722 if (*it == file)
723 {
724 m_prefetchList.erase(it);
725 break;
726 }
727 }
728 m_prefetch_condVar.UnLock();
729}

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string & cinfo_fname)

Definition at line 925 of file XrdPfc.cc.

926{
927 if (m_metaXattr) {
928 char pfn[4096];
929 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
930 long long fsize = -1ll;
931 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
932 if (res == sizeof(long long))
933 {
934 return fsize;
935 }
936 else
937 {
938 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
939 }
940 }
941
942 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
943 XrdOucEnv env;
944 long long ret;
945 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
946 if (res < 0) {
947 ret = res;
948 } else {
949 Info info(m_trace, 0);
950 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
951 ret = -EBADF;
952 } else {
953 ret = info.GetFileSize();
954 }
955 infoFile->Close();
956 }
957 delete infoFile;
958 return ret;
959}
XrdSysXAttr * XrdSysXAttrActive
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition XrdOss.hh:873
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdSysXAttr::Get(), XrdPfc::Info::GetFileSize(), XrdOss::Lfn2Pfn(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string & command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52{
53 static const char *top_epfx = "ExecuteCommandUrl ";
54
55 SplitParser cp(command_url, "/");
56
57 std::string token = cp.get_token();
58
59 if (token != "xrdpfc_command")
60 {
61 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62 return;
63 }
64
65 // Get the command
66 token = cp.get_token_as_string();
67
68 auto get_opt = [](SplitParser &sp) -> char {
69 const char *t = sp.get_token();
70 if (t)
71 return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72 else
73 return -1;
74 };
75
76 //================================================================
77 // create_file
78 //================================================================
79
80 if (token == "create_file")
81 {
82 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83 static const char* usage =
84 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85 " Creates a cache file with given parameters. Data in file is random.\n"
86 " Useful for cache purge testing.\n"
87 "Notes:\n"
88 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90 " . -t and -d can be given multiple times to record several accesses.\n"
91 " . Negative arguments given to -t are interpreted as relative to now.\n";
92
93 const Configuration &conf = m_configuration;
94
95 token = cp.get_token_as_string();
96 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97 if (token.empty()) {
98 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99 return;
100 }
101 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102 if ( ! cp.has_reminder()) {
103 TRACE(Error, err_prefix << "Path section must not be empty.");
104 return;
105 }
106
107 long long file_size = ONE_GB;
108 long long block_size = conf.m_bufferSize;
109 int access_time [MAX_ACCESSES];
110 int access_duration[MAX_ACCESSES];
111 int at_count = 0, ad_count = 0;
112
113 time_t time_now = time(0);
114
115 SplitParser ap(token, " ");
116 char theOpt;
117
118 while ((theOpt = get_opt(ap)) != (char) -1)
119 {
120 switch (theOpt)
121 {
122 case 'h': {
123 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124 return;
125 }
126 case 's': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128 &file_size, 0ll, 32 * ONE_GB))
129 return;
130 break;
131 }
132 case 'b': {
133 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134 &block_size, 0ll, 64 * ONE_MB))
135 return;
136 break;
137 }
138 case 't': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140 &access_time[at_count++], INT_MIN, INT_MAX))
141 return;
142 break;
143 }
144 case 'd': {
145 if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146 &access_duration[ad_count++], 0, 24 * 3600))
147 return;
148 break;
149 }
150 default: {
151 TRACE(Error, err_prefix << "Unhandled command argument.");
152 return;
153 }
154 }
155 }
156
157 if (at_count < 1) access_time [at_count++] = time_now - 10;
158 if (ad_count < 1) access_duration[ad_count++] = 10;
159
160 if (at_count != ad_count)
161 {
162 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163 return;
164 }
165
166 std::string file_path (cp.get_reminder_with_delim());
167 std::string cinfo_path(file_path + Info::s_infoExtension);
168
169 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170
171 // Check if cinfo exists ... bail out if it does.
172 {
173 struct stat infoStat;
174 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175 {
176 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177 return;
178 }
179 }
180
181 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182
183 {
184 const char *myUser = conf.m_username.c_str();
185 XrdOucEnv myEnv;
186
187 // Create the data file.
188
189 char size_str[32]; sprintf(size_str, "%lld", file_size);
190 myEnv.Put("oss.asize", size_str);
191 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192 int cret;
193 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194 {
195 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196 return;
197 }
198
199 XrdOssDF *myFile = GetOss()->newFile(myUser);
200 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201 {
202 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203 delete myFile;
204 return;
205 }
206
207 // Create the info file.
208
209 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212 {
213 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214 myFile->Close(); delete myFile;
215 return;
216 }
217
218 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220 {
221 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222 delete myInfoFile;
223 myFile->Close(); delete myFile;
224 return;
225 }
226
227 // Allocate space for the data file.
228
229 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230 {
231 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232 }
233
234 // Fill up cinfo.
235
236 Info myInfo(m_trace, false);
237 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238 myInfo.SetAllBitsSynced();
239
240 for (int i = 0; i < at_count; ++i)
241 {
242 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243
244 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245 }
246
247 myInfo.Write(myInfoFile, cinfo_path.c_str());
248
249 // Fake last modified time to the last access_time
250 {
251 time_t last_detach;
252 myInfo.GetLatestDetachTime(last_detach);
253 struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254
255 futimens(myInfoFile->getFD(), acc_mod_time);
256 }
257
258 myInfoFile->Close(); delete myInfoFile;
259 myFile->Close(); delete myFile;
260
261 struct stat dstat;
262 GetOss()->Stat(file_path.c_str(), &dstat);
263 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264 << "st_blocks=" << dstat.st_blocks);
265
266 {
267 XrdSysCondVarHelper lock(&m_writeQ.condVar);
268
269 m_writeQ.writes_between_purges += file_size;
270 }
271 {
272 int token = m_res_mon->register_file_open(file_path, time_now, false);
273 XrdPfc::Stats stats;
274 stats.m_BytesWritten = file_size;
275 stats.m_StBlocksAdded = dstat.st_blocks;
276 m_res_mon->register_file_update_stats(token, stats);
277 m_res_mon->register_file_close(token, time(0), stats);
278 }
279 }
280 }
281
282 //================================================================
283 // remove_file
284 //================================================================
285
286 else if (token == "remove_file")
287 {
288 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289 static const char* usage =
290 "Usage: remove_file/ [-h] /<path>\n"
291 " Removes given file from the cache unless it is currently open.\n"
292 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293 "Notes:\n"
294 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295
296 token = cp.get_token_as_string();
297 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298 if (token.empty()) {
299 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300 return;
301 }
302 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303 if ( ! cp.has_reminder()) {
304 TRACE(Error, err_prefix << "Path section must not be empty.");
305 return;
306 }
307
308 SplitParser ap(token, " ");
309 char theOpt;
310
311 while ((theOpt = get_opt(ap)) != (char) -1)
312 {
313 switch (theOpt)
314 {
315 case 'h': {
316 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317 return;
318 }
319 default: {
320 TRACE(Error, err_prefix << "Unhandled command argument.");
321 return;
322 }
323 }
324 }
325
326 std::string f_name(cp.get_reminder_with_delim());
327
328 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329
330 int ret = UnlinkFile(f_name, true);
331
332 TRACE(Info, err_prefix << "returned with status " << ret);
333 }
334
335 //================================================================
336 // unknown command
337 //================================================================
338
339 else
340 {
341 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342 }
343}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:426
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1187
XrdOss * GetOss() const
Definition XrdPfc.hh:268
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesWritten
number of bytes written to disk
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::ResourceMonitor::register_file_close(), XrdPfc::ResourceMonitor::register_file_open(), XrdPfc::ResourceMonitor::register_file_update_stats(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, XrdOss::Stat(), stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File * f,
bool high_debug )

Definition at line 542 of file XrdPfc.cc.

543{
544 dec_ref_cnt(f, high_debug);
545}

◆ GetFile()

File * Cache::GetFile ( const std::string & path,
IO * io,
long long off = 0,
long long filesize = 0 )

Definition at line 389 of file XrdPfc.cc.

390{
391 // Called from virtual IOFile constructor.
392
393 TRACE(Debug, "GetFile " << path << ", io " << io);
394
395 ActiveMap_i it;
396
397 {
398 XrdSysCondVarHelper lock(&m_active_cond);
399
400 while (true)
401 {
402 it = m_active.find(path);
403
404 // File is not open or being opened. Mark it as being opened and
405 // proceed to opening it outside of while loop.
406 if (it == m_active.end())
407 {
408 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
409 break;
410 }
411
412 if (it->second != 0)
413 {
414 it->second->AddIO(io);
415 inc_ref_cnt(it->second, false, true);
416
417 return it->second;
418 }
419 else
420 {
421 // Wait for some change in m_active, then recheck.
422 m_active_cond.Wait();
423 }
424 }
425 }
426
427 // This is always true, now that IOFileBlock is unsupported.
428 if (filesize == 0)
429 {
430 struct stat st;
431 int res = io->Fstat(st);
432 if (res < 0) {
433 errno = res;
434 TRACE(Error, "GetFile, could not get valid stat");
435 } else if (res > 0) {
436 errno = ENOTSUP;
437 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
438 } else {
439 filesize = st.st_size;
440 }
441 }
442
443 File *file = 0;
444
445 if (filesize >= 0)
446 {
447 file = File::FileOpen(path, off, filesize);
448 }
449
450 {
451 XrdSysCondVarHelper lock(&m_active_cond);
452
453 if (file)
454 {
455 inc_ref_cnt(file, false, true);
456 it->second = file;
457
458 file->AddIO(io);
459 }
460 else
461 {
462 m_active.erase(it);
463 }
464
465 m_active_cond.Broadcast();
466 }
467
468 return file;
469}
virtual int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
void AddIO(IO *io)

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat, TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 286 of file XrdPfc.hh.

286{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 132 of file XrdPfc.cc.

132{ return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( )
inline

Definition at line 282 of file XrdPfc.hh.

282{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 732 of file XrdPfc.cc.

733{
734 m_prefetch_condVar.Lock();
735 while (m_prefetchList.empty())
736 {
737 m_prefetch_condVar.Wait();
738 }
739
740 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
741
742 size_t l = m_prefetchList.size();
743 int idx = rand() % l;
744 File* f = m_prefetchList[idx];
745
746 m_prefetch_condVar.UnLock();
747 return f;
748}

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 268 of file XrdPfc.hh.

268{ return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin * XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 272 of file XrdPfc.hh.

272{ return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( )
inline

Definition at line 283 of file XrdPfc.hh.

283{ return m_trace; }

Referenced by XrdPfc::File::GetTrace(), and XrdPfc::IO::GetTrace().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string & path) const

Definition at line 676 of file XrdPfc.cc.

677{
678 XrdSysCondVarHelper lock(&m_active_cond);
679
680 return m_active.find(path) != m_active.end() ||
681 m_purge_delay_set.find(path) != m_purge_delay_set.end();
682}

◆ LocalFilePath()

int Cache::LocalFilePath ( const char * curl,
char * buff = 0,
int blen = 0,
LFP_Reason why = ForAccess,
bool forall = false )
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 793 of file XrdPfc.cc.

795{
796 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
797 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
798 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
799
800 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
801
802 if (buff && blen > 0) buff[0] = 0;
803
804 XrdCl::URL url(curl);
805 std::string f_name = url.GetPath();
806 std::string i_name = f_name + Info::s_infoExtension;
807
808 if (why == ForPath)
809 {
810 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
811 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
812 return ret;
813 }
814
815 {
816 XrdSysCondVarHelper lock(&m_active_cond);
817 m_purge_delay_set.insert(f_name);
818 }
819
820 struct stat sbuff, sbuff2;
821 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
822 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
823 {
824 if (S_ISDIR(sbuff.st_mode))
825 {
826 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
827 return -EISDIR;
828 }
829 else
830 {
831 bool read_ok = false;
832 bool is_complete = false;
833
834 // Lock and check if the file is active. If NOT, keep the lock
835 // and add dummy access after successful reading of info file.
836 // If it IS active, just release the lock, this ongoing access will
837 // assure the file continues to exist.
838
839 // XXXX How can I just loop over the cinfo file when active?
840 // Can I not get is_complete from the existing file?
841 // Do I still want to inject access record?
842 // Oh, it writes only if not active .... still let's try to use existing File.
843
844 m_active_cond.Lock();
845
846 bool is_active = m_active.find(f_name) != m_active.end();
847
848 if (is_active) m_active_cond.UnLock();
849
850 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
851 XrdOucEnv myEnv;
852 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
853 if (res >= 0)
854 {
855 Info info(m_trace, 0);
856 if (info.Read(infoFile, i_name.c_str()))
857 {
858 read_ok = true;
859
860 is_complete = info.IsComplete();
861
862 // Add full-size access if reason is for access.
863 if ( ! is_active && is_complete && why == ForAccess)
864 {
865 info.WriteIOStatSingle(info.GetFileSize());
866 info.Write(infoFile, i_name.c_str());
867 }
868 }
869 infoFile->Close();
870 }
871 delete infoFile;
872
873 if ( ! is_active) m_active_cond.UnLock();
874
875 if (read_ok)
876 {
877 if ((is_complete || why == ForInfo) && buff != 0)
878 {
879 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
880 if (res2 < 0)
881 return res2;
882
883 // Normally, files are owned by us but when direct cache access
884 // is wanted and possible, make sure the file is world readable.
885 if (why == ForAccess)
886 {mode_t mode = (forall ? worldReadable : groupReadable);
887 if (((sbuff.st_mode & worldReadable) != mode)
888 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
889 {is_complete = false;
890 *buff = 0;
891 }
892 }
893 }
894
895 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
896 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
897
898 return is_complete ? 0 : -EREMOTE;
899 }
900 }
901 }
902
903 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
904 return -ENOENT;
905}
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0

References XrdOss::Chmod(), XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), stat, TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 751 of file XrdPfc.cc.

752{
753 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
754
755 while (true)
756 {
757 m_RAM_mutex.Lock();
758 bool doPrefetch = (m_RAM_used < limit_RAM);
759 m_RAM_mutex.UnLock();
760
761 if (doPrefetch)
762 {
764 f->Prefetch();
765 }
766 else
767 {
769 }
770 }
771}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:732
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char * curl,
int oflags,
mode_t mode )
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1065 of file XrdPfc.cc.

1066{
1067 XrdCl::URL url(curl);
1068 std::string f_name = url.GetPath();
1069 std::string i_name = f_name + Info::s_infoExtension;
1070
1071 // Do not allow write access.
1072 if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1073 {
1074 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1075 return -EROFS;
1076 }
1077
1078 // Intercept xrdpfc_command requests.
1079 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1080 {
1081 // Schedule a job to process command request.
1082 {
1083 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1084
1085 schedP->Schedule(ce);
1086 }
1087
1088 return -EAGAIN;
1089 }
1090
1091 {
1092 XrdSysCondVarHelper lock(&m_active_cond);
1093 m_purge_delay_set.insert(f_name);
1094 }
1095
1096 struct stat sbuff;
1097 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1098 {
1099 TRACE(Dump, "Prepare defer open " << f_name);
1100 return 1;
1101 }
1102 else
1103 {
1104 return 0;
1105 }
1106}
static XrdScheduler * schedP
Definition XrdPfc.hh:290
void Schedule(XrdJob *jp)

References XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), XrdOss::Stat(), stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 273 of file XrdPfc.cc.

274{
275 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
276
277 while (true)
278 {
279 m_writeQ.condVar.Lock();
280 while (m_writeQ.size == 0)
281 {
282 m_writeQ.condVar.Wait();
283 }
284
285 // MT -- optimize to pop several blocks if they are available (or swap the list).
286 // This makes sense especially for smallish block sizes.
287
288 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
289 long long sum_size = 0;
290
291 for (int bi = 0; bi < n_pushed; ++bi)
292 {
293 Block* block = m_writeQ.queue.front();
294 m_writeQ.queue.pop_front();
295 m_writeQ.writes_between_purges += block->get_size();
296 sum_size += block->get_size();
297
298 blks_to_write[bi] = block;
299
300 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
301 }
302 m_writeQ.size -= n_pushed;
303
304 m_writeQ.condVar.UnLock();
305
306 {
307 XrdSysMutexHelper lock(&m_RAM_mutex);
308 m_RAM_write_queue -= sum_size;
309 }
310
311 for (int bi = 0; bi < n_pushed; ++bi)
312 {
313 Block* block = blks_to_write[bi];
314
315 block->m_file->WriteBlockToDisk(block);
316 }
317 }
318}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdSysCondVar::Lock(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, XrdSysCondVar::UnLock(), XrdSysCondVar::Wait(), and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 204 of file XrdPfc.hh.

204{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), XrdPfc::File::WriteBlockToDisk(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor & XrdPfc::Cache::RefResMon ( )
inline

Definition at line 285 of file XrdPfc.hh.

285{ return *m_res_mon; }

Referenced by ResMon().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File * file)

Definition at line 694 of file XrdPfc.cc.

695{
696 // Can be called with other locks held.
697
698 if ( ! m_prefetch_enabled)
699 {
700 return;
701 }
702
703 m_prefetch_condVar.Lock();
704 m_prefetchList.push_back(file);
705 m_prefetch_condVar.Signal();
706 m_prefetch_condVar.UnLock();
707}

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File * f,
IO * io )

Definition at line 471 of file XrdPfc.cc.

472{
473 // Called from virtual IO::DetachFinalize.
474
475 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
476
477 {
478 XrdSysCondVarHelper lock(&m_active_cond);
479
480 f->RemoveIO(io);
481 }
482 dec_ref_cnt(f, true);
483}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char * buf,
long long size )

Definition at line 371 of file XrdPfc.cc.

372{
373 bool std_size = (size == m_configuration.m_bufferSize);
374 {
375 XrdSysMutexHelper lock(&m_RAM_mutex);
376
377 m_RAM_used -= size;
378
379 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
380 {
381 m_RAM_std_blocks.push_back(buf);
382 ++m_RAM_std_size;
383 return;
384 }
385 }
386 free(buf);
387}

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File * f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 240 of file XrdPfc.cc.

241{
242 std::list<Block*> removed_blocks;
243 long long sum_size = 0;
244
245 m_writeQ.condVar.Lock();
246 std::list<Block*>::iterator i = m_writeQ.queue.begin();
247 while (i != m_writeQ.queue.end())
248 {
249 if ((*i)->m_file == file)
250 {
251 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
252 std::list<Block*>::iterator j = i++;
253 removed_blocks.push_back(*j);
254 sum_size += (*j)->get_size();
255 m_writeQ.queue.erase(j);
256 --m_writeQ.size;
257 }
258 else
259 {
260 ++i;
261 }
262 }
263 m_writeQ.condVar.UnLock();
264
265 {
266 XrdSysMutexHelper lock(&m_RAM_mutex);
267 m_RAM_write_queue -= sum_size;
268 }
269
270 file->BlocksRemovedFromWriteQ(removed_blocks);
271}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdSysCondVar::Lock(), XrdPfc::File::lPath(), TRACE, and XrdSysCondVar::UnLock().

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long size)

Definition at line 331 of file XrdPfc.cc.

332{
333 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
334
335 bool std_size = (size == m_configuration.m_bufferSize);
336
337 m_RAM_mutex.Lock();
338
339 long long total = m_RAM_used + size;
340
341 if (total <= m_configuration.m_RamAbsAvailable)
342 {
343 m_RAM_used = total;
344 if (std_size && m_RAM_std_size > 0)
345 {
346 char *buf = m_RAM_std_blocks.back();
347 m_RAM_std_blocks.pop_back();
348 --m_RAM_std_size;
349
350 m_RAM_mutex.UnLock();
351
352 return buf;
353 }
354 else
355 {
356 m_RAM_mutex.UnLock();
357 char *buf;
358 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
359 {
360 // Report out of mem? Probably should report it at least the first time,
361 // then periodically.
362 return 0;
363 }
364 return buf;
365 }
366 }
367 m_RAM_mutex.UnLock();
368 return 0;
369}

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 135 of file XrdPfc.cc.

135{ return m_instance->RefResMon(); }
ResourceMonitor & RefResMon()
Definition XrdPfc.hh:285

References RefResMon().

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File * f)
inline

Definition at line 278 of file XrdPfc.hh.

278{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char * curl,
struct stat & sbuff )
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1116 of file XrdPfc.cc.

1117{
1118 const char *tpfx = "Stat ";
1119
1120 XrdCl::URL url(curl);
1121 std::string f_name = url.GetPath();
1122
1123 File *file = nullptr;
1124 {
1125 XrdSysCondVarHelper lock(&m_active_cond);
1126 auto it = m_active.find(f_name);
1127 if (it != m_active.end()) {
1128 file = it->second;
1129 // If `file` is nullptr, the file-open is in progress; instead
1130 // of waiting for the file-open to finish, simply treat it as if
1131 // the file-open doesn't exist.
1132 if (file) {
1133 inc_ref_cnt(file, false, false);
1134 }
1135 }
1136 }
1137 if (file) {
1138 int res = file->Fstat(sbuff);
1139 dec_ref_cnt(file, false);
1140 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1141 return res;
1142 }
1143
1144 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1145 if (res != XrdOssOK) {
1146 TRACE(Debug, tpfx << curl << " -> " << res);
1147 return 1; // res; -- for only-if-cached
1148 }
1149 if (S_ISDIR(sbuff.st_mode))
1150 {
1151 TRACE(Debug, tpfx << curl << " -> EISDIR");
1152 return -EISDIR;
1153 }
1154
1155 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1156 if (file_size < 0) {
1157 TRACE(Debug, tpfx << curl << " -> " << file_size);
1158 return 1; // (int) file_size; -- for only-if-cached
1159 }
1160 sbuff.st_size = file_size;
1161 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1162 if ( ! is_cached)
1163 sbuff.st_atime = 0;
1164
1165 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1166
1167 return 0;
1168}

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 133 of file XrdPfc.cc.

133{ return *m_instance; }

Referenced by XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char * curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1177 of file XrdPfc.cc.

1178{
1179 XrdCl::URL url(curl);
1180 std::string f_name = url.GetPath();
1181
1182 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1183
1184 return UnlinkFile(f_name, false);
1185}

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string & f_name,
bool fail_if_open )

Remove cinfo and data files from cache.

Definition at line 1187 of file XrdPfc.cc.

1188{
1189 static const char* trc_pfx = "UnlinkFile ";
1190 ActiveMap_i it;
1191 File *file = 0;
1192 long long st_blocks_to_purge = 0;
1193 {
1194 XrdSysCondVarHelper lock(&m_active_cond);
1195
1196 it = m_active.find(f_name);
1197
1198 if (it != m_active.end())
1199 {
1200 if (fail_if_open)
1201 {
1202 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1203 return -EBUSY;
1204 }
1205
1206 // Null File* in m_active map means an operation is ongoing, probably
1207 // Attach() with possible File::Open(). Ask for retry.
1208 if (it->second == 0)
1209 {
1210 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1211 return -EAGAIN;
1212 }
1213
1214 file = it->second;
1215 st_blocks_to_purge = file->initiate_emergency_shutdown();
1216 it->second = 0;
1217 }
1218 else
1219 {
1220 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1221 }
1222 }
1223
1224 if (file) {
1226 } else {
1227 struct stat f_stat;
1228 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1229 st_blocks_to_purge = f_stat.st_blocks;
1230 }
1231
1232 std::string i_name = f_name + Info::s_infoExtension;
1233
1234 // Unlink file & cinfo
1235 int f_ret = m_oss->Unlink(f_name.c_str());
1236 int i_ret = m_oss->Unlink(i_name.c_str());
1237
1238 if (st_blocks_to_purge)
1239 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1240
1241 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1242
1243 {
1244 XrdSysCondVarHelper lock(&m_active_cond);
1245
1246 m_active.erase(it);
1247 }
1248
1249 return std::min(f_ret, i_ret);
1250}
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:240
long long initiate_emergency_shutdown()
void register_file_purge(DirState *target, long long size_in_st_blocks)

References Debug, XrdPfc::File::initiate_emergency_shutdown(), XrdPfc::ResourceMonitor::register_file_purge(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), stat, TRACE, XrdOss::Unlink(), and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo & urVersion)
inlinestatic

Version check.

Definition at line 233 of file XrdPfc.hh.

233{ return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int cinfo_fd,
long long file_size )

Definition at line 910 of file XrdPfc.cc.

911{
912 if (m_metaXattr) {
913 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
914 if (res != 0) {
915 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
916 }
917 }
918}
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 320 of file XrdPfc.cc.

321{
322 // Called from ResourceMonitor for an alternative estimation of disk writes.
323 XrdSysCondVarHelper lock(&m_writeQ.condVar);
324 long long ret = m_writeQ.writes_between_purges;
325 m_writeQ.writes_between_purges = 0;
326 return ret;
327}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: