XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters)
 Parse configuration file.
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog ()
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace ()
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor.
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
 
static CacheGetInstance ()
 Singleton access.
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file)
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write)
 
static const int optRW = 0x0004
 File is read/write (o/w read/only)
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 153 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger * logger,
XrdOucEnv * env )

Constructor.

Definition at line 158 of file XrdPfc.cc.

158 :
159 XrdOucCache("pfc"),
160 m_env(env),
161 m_log(logger, "XrdPfc_"),
162 m_trace(new XrdSysTrace("XrdPfc", logger)),
163 m_traceID("Cache"),
164 m_oss(0),
165 m_gstream(0),
166 m_purge_pin(0),
167 m_prefetch_condVar(0),
168 m_prefetch_enabled(false),
169 m_RAM_used(0),
170 m_RAM_write_queue(0),
171 m_RAM_std_size(0),
172 m_isClient(false),
173 m_active_cond(0)
174{
175 // Default log level is Warning.
176 m_trace->What = 2;
177}
XrdOucCache(const char *ctype)

References XrdOucCache::XrdOucCache().

Referenced by CreateInstance(), GetInstance(), and TheOne().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block * b,
bool from_read )

Add downloaded block in write queue.

Definition at line 225 of file XrdPfc.cc.

226{
227 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
228
229 {
230 XrdSysMutexHelper lock(&m_RAM_mutex);
231 m_RAM_write_queue += b->get_size();
232 }
233
234 m_writeQ.condVar.Lock();
235 if (fromRead)
236 m_writeQ.queue.push_back(b);
237 else
238 m_writeQ.queue.push_front(b);
239 m_writeQ.size++;
240 m_writeQ.condVar.Signal();
241 m_writeQ.condVar.UnLock();
242}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
const std::string & GetLocalPath() const

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach()

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO * io,
int Options = 0 )
virtual

Implements XrdOucCache.

Definition at line 179 of file XrdPfc.cc.

180{
181 const char* tpfx = "Attach() ";
182
183 if (Options & XrdOucCache::optRW)
184 {
185 TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
186 }
187 else if (Cache::GetInstance().Decide(io))
188 {
189 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
190
191 IO *cio;
192
193 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
194 {
195 cio = new IOFileBlock(io, *this);
196 }
197 else
198 {
199 IOFile *iof = new IOFile(io, *this);
200
201 if ( ! iof->HasFile())
202 {
203 delete iof;
204 // TODO - redirect instead. But this is kind of an awkward place for it.
205 // errno is set during IOFile construction.
206 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
207 return io;
208 }
209
210 cio = iof;
211 }
212
213 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
214 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
215
216 return cio;
217 }
218 else
219 {
220 TRACE(Info, tpfx << "decision decline " << io->Path());
221 }
222 return io;
223}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
static const int optRW
File is read/write (o/w read/only)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:206
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:137
bool HasFile() const
Check if File was opened successfully.

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCache::optRW, XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 688 of file XrdPfc.cc.

689{
690 XrdSysCondVarHelper lock(&m_active_cond);
691 m_purge_delay_set.clear();
692}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 134 of file XrdPfc.cc.

134{ return m_instance->RefConfiguration(); }

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char * config_filename,
const char * parameters )

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
Returns
parse status

Definition at line 409 of file XrdPfcConfiguration.cc.

410{
411 // Indicate whether or not we are a client instance
412 const char *theINS = getenv("XRDINSTANCE");
413 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
414
415 // Tell everyone else we are a caching proxy
416 XrdOucEnv::Export("XRDPFC", 1);
417
418 XrdOucEnv myEnv;
419 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
420
421 if (! config_filename || ! *config_filename)
422 {
423 TRACE(Error, "Config() configuration file not specified.");
424 return false;
425 }
426
427 int fd;
428 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
429 {
430 TRACE( Error, "Config() can't open configuration file " << config_filename);
431 return false;
432 }
433
434 Config.Attach(fd);
435 static const char *cvec[] = { "*** pfc plugin config:", 0 };
436 Config.Capture(cvec);
437
438 // Obtain OFS configurator for OSS plugin.
439 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
440 &XrdVERSIONINFOVAR(XrdOucGetCache));
441 if (! ofsCfg) return false;
442
443 TmpConfiguration tmpc;
444
445 // Adjust default parameters for client/serverless caching
446 if (m_isClient)
447 {
448 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
449 m_configuration.m_wqueue_blocks = 8;
450 m_configuration.m_wqueue_threads = 1;
451 }
452
453 // If network checksum processing is the default, indicate so.
454 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
455
456 // Actual parsing of the config file.
457 bool retval = true, aOK = true;
458 char *var;
459 while ((var = Config.GetMyFirstWord()))
460 {
461 if (! strcmp(var,"pfc.osslib"))
462 {
463 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
464 }
465 else if (! strcmp(var,"pfc.cschk"))
466 {
467 retval = xcschk(Config);
468 }
469 else if (! strcmp(var,"pfc.decisionlib"))
470 {
471 retval = xdlib(Config);
472 }
473 else if (! strcmp(var,"pfc.purgelib"))
474 {
475 retval = xplib(Config);
476 }
477 else if (! strcmp(var,"pfc.trace"))
478 {
479 retval = xtrace(Config);
480 }
481 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
482 {
483 m_configuration.m_allow_xrdpfc_command = true;
484 }
485 else if (! strncmp(var,"pfc.", 4))
486 {
487 retval = ConfigParameters(std::string(var+4), Config, tmpc);
488 }
489
490 if ( ! retval)
491 {
492 TRACE(Error, "Config() error in parsing");
493 aOK = false;
494 }
495 }
496
497 Config.Close();
498
499 // Load OSS plugin.
500 myEnv.Put("oss.runmode", "pfc");
501 if (m_configuration.is_cschk_cache())
502 {
503 char csi_conf[128];
504 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
505 {
506 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
507 } else {
508 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
509 return false;
510 }
511 }
512 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
513 {
514 ofsCfg->Plugin(m_oss);
515 }
516 else
517 {
518 TRACE(Error, "Config() Unable to create an OSS object");
519 return false;
520 }
521
522 // Test if OSS is operational, determine optional features.
523 aOK &= test_oss_basics_and_features();
524
525 // sets default value for disk usage
526 XrdOssVSInfo sP;
527 {
528 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
529 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
530 {
531 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
532 return false;
533 }
534 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
535 {
536 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
537 m_configuration.m_meta_space.c_str());
538 return false;
539 }
540 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
541 {
542 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
543 return false;
544 }
545 if (sP.Total < 10ll << 20)
546 {
547 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
548 m_configuration.m_data_space.c_str());
549 return false;
550 }
551
552 m_configuration.m_diskTotalSpace = sP.Total;
553
554 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
555 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
556 {
557 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
558 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
559 aOK = false;
560 }
561 }
562 else aOK = false;
563
564 if ( ! tmpc.m_fileUsageMax.empty())
565 {
566 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
567 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
568 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
569 {
570 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
571 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
572 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
573 {
574 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
575 aOK = false;
576 }
577
578
579 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
580 {
581 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
582 aOK = false;
583 }
584 }
585 else aOK = false;
586 }
587 }
588
589 // sets flush frequency
590 if ( ! tmpc.m_flushRaw.empty())
591 {
592 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
593 {
594 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
595 &m_configuration.m_flushCnt,
596 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
597 {
598 return false;
599 }
600 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
601 }
602 else
603 {
604 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
605 &m_configuration.m_flushCnt, 100, 100000))
606 {
607 return false;
608 }
609 }
610 }
611
612 // set the write mode
613 if ( ! tmpc.m_writemodeRaw.empty())
614 {
615 if (tmpc.m_writemodeRaw == "writethrough")
616 {
617 m_configuration.m_write_through = true;
618 }
619 else if (tmpc.m_writemodeRaw != "off")
620 {
621 m_log.Emsg("ConfigParameters()", "Unknown value for pfc.writemode (valid values are `writethrough` or `off`): %s",
622 tmpc.m_writemodeRaw.c_str());
623 return false;
624 }
625 }
626
627 // get number of available RAM blocks after process configuration
628 if (m_configuration.m_RamAbsAvailable == 0)
629 {
630 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
631 char buff[1024];
632 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
633 m_log.Say("Config info: ", buff);
634 }
635 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
636 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
637
638 // Set tracing to debug if this is set in environment
639 char* cenv = getenv("XRDDEBUG");
640 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
641
642 if (aOK)
643 {
644 int loff = 0;
645// 000 001 010
646 const char *csc[] = {"off", "cache nonet", "nocache net notls",
647// 011
648 "cache net notls",
649// 100 101 110
650 "off", "cache nonet", "nocache net tls",
651// 111
652 "cache net tls"};
653 char buff[8192], uvk[32];
654 if (m_configuration.m_cs_UVKeep < 0)
655 strcpy(uvk, "lru");
656 else
657 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
658 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
659 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
660 " pfc.cschk %s uvkeep %s\n"
661 " pfc.blocksize %lld\n"
662 " pfc.prefetch %d\n"
663 " pfc.ram %.fg\n"
664 " pfc.writequeue %d %d\n"
665 " # Total available disk: %lld\n"
666 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
667 " pfc.spaces %s %s\n"
668 " pfc.trace %d\n"
669 " pfc.flush %lld\n"
670 " pfc.acchistorysize %d\n"
671 " pfc.onlyIfCachedMinBytes %lld\n"
672 " pfc.onlyIfCachedMinFrac %.2f\n",
673 config_filename,
674 csc[int(m_configuration.m_cs_Chk)], uvk,
675 m_configuration.m_bufferSize,
676 m_configuration.m_prefetch_max_blocks,
677 rg,
678 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
679 sP.Total,
680 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
681 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
682 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
683 m_configuration.m_data_space.c_str(),
684 m_configuration.m_meta_space.c_str(),
685 m_trace->What,
686 m_configuration.m_flushCnt,
687 m_configuration.m_accHistorySize,
688 m_configuration.m_onlyIfCachedMinSize,
689 m_configuration.m_onlyIfCachedMinFrac);
690
691 if (m_configuration.is_dir_stat_reporting_on())
692 {
693 loff += snprintf(buff + loff, sizeof(buff) - loff,
694 " pfc.dirstats interval %d maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
695 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
696 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
697 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
698 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
699 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
700 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
701 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
702 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
703 }
704
705 if (m_configuration.m_hdfsmode)
706 {
707 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
708 }
709 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.writemode %s\n", m_configuration.m_write_through ? "writethrough" : "off");
710
711 if (m_configuration.m_username.empty())
712 {
713 char unameBuff[256];
714 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
715 m_configuration.m_username = unameBuff;
716 }
717 else
718 {
719 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
720 }
721
722 m_log.Say(buff);
723
724 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
725 }
726
727 // Derived settings
728 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
729 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
730
731 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
732
733 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
734
735 // Create the ResourceMonitor and get it ready for starting the main thread function.
736 if (aOK)
737 {
738 m_res_mon = new ResourceMonitor(*m_oss);
739 m_res_mon->init_before_main();
740 }
741
742 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
743
744 if (ofsCfg) delete ofsCfg;
745
746 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
747 // Building of xrdpfc_print fails when this is enabled.
748#ifdef XRDPFC_CKSUM_TEST
749 {
750 int xxx = m_configuration.m_cs_Chk;
751
752 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
753 {
754 Info::TestCksumStuff();
755 }
756
757 m_configuration.m_cs_Chk = xxx;
758 }
759#endif
760
761 return aOK;
762}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define open
Definition XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:188
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
static size_t s_maxNumAccess
std::string m_writemodeRaw
Definition XrdPfc.hh:137
std::string m_diskUsageLWM
Definition XrdPfc.hh:131
std::string m_diskUsageHWM
Definition XrdPfc.hh:132
std::string m_fileUsageBaseline
Definition XrdPfc.hh:133
std::string m_fileUsageNominal
Definition XrdPfc.hh:134
std::string m_flushRaw
Definition XrdPfc.hh:136
std::string m_fileUsageMax
Definition XrdPfc.hh:135

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, Error, XrdOucEnv::Export(), XrdOfsConfigPI::Load(), XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::TmpConfiguration::m_writemodeRaw, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char * curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1003 of file XrdPfc.cc.

1004{
1005 static const char* tpfx = "ConsiderCached ";
1006
1007 TRACE(Debug, tpfx << curl);
1008
1009 XrdCl::URL url(curl);
1010 std::string f_name = url.GetPath();
1011
1012 File *file = nullptr;
1013 {
1014 XrdSysCondVarHelper lock(&m_active_cond);
1015 auto it = m_active.find(f_name);
1016 if (it != m_active.end()) {
1017 file = it->second;
1018 // If the file-open is in progress, `file` is a nullptr
1019 // so we cannot increase the reference count. For now,
1020 // simply treat it as if the file open doesn't exist instead
1021 // of trying to wait and see if it succeeds.
1022 if (file) {
1023 inc_ref_cnt(file, false, false);
1024 }
1025 }
1026 }
1027 if (file) {
1028 struct stat sbuff;
1029 int res = file->Fstat(sbuff);
1030 dec_ref_cnt(file, false);
1031 if (res)
1032 return res;
1033 // DecideIfConsideredCached() already called in File::Fstat().
1034 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1035 }
1036
1037 struct stat sbuff;
1038 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1039 if (res != XrdOssOK) {
1040 TRACE(Debug, tpfx << curl << " -> " << res);
1041 return res;
1042 }
1043 if (S_ISDIR(sbuff.st_mode))
1044 {
1045 TRACE(Debug, tpfx << curl << " -> EISDIR");
1046 return -EISDIR;
1047 }
1048
1049 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1050 if (file_size < 0) {
1051 TRACE(Debug, tpfx << curl << " -> " << file_size);
1052 return (int) file_size;
1053 }
1054 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1055
1056 return is_cached ? 0 : -EREMOTE;
1057}
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:929
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:970
int Fstat(struct stat &sbuff)
static const char * s_infoExtension

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger * logger,
XrdOucEnv * env )
static

Singleton creation.

Definition at line 125 of file XrdPfc.cc.

126{
127 assert (m_instance == 0);
128 m_instance = new Cache(logger, env);
129 return *m_instance;
130}
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:158

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO * io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 137 of file XrdPfc.cc.

138{
139 if (! m_decisionpoints.empty())
140 {
141 XrdCl::URL url(io->Path());
142 std::string filename = url.GetPath();
143 std::vector<Decision*>::const_iterator it;
144 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145 {
146 XrdPfc::Decision *d = *it;
147 if (! d) continue;
148 if (! d->Decide(filename, *m_oss))
149 {
150 return false;
151 }
152 }
153 }
154
155 return true;
156}
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long file_size,
long long bytes_on_disk )

Definition at line 970 of file XrdPfc.cc.

971{
972 if (file_size == 0 || bytes_on_disk >= file_size)
973 return true;
974
975 double frac_on_disk = (double) bytes_on_disk / file_size;
976
977 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
978 {
979 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
980 return true;
981 }
982 else
983 {
984 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
985 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
986 return true;
987 }
988 return false;
989}

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File * file)

Definition at line 714 of file XrdPfc.cc.

715{
716 // Can be called with other locks held.
717
718 if ( ! m_prefetch_enabled)
719 {
720 return;
721 }
722
723 m_prefetch_condVar.Lock();
724 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
725 {
726 if (*it == file)
727 {
728 m_prefetchList.erase(it);
729 break;
730 }
731 }
732 m_prefetch_condVar.UnLock();
733}

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string & cinfo_fname)

Definition at line 929 of file XrdPfc.cc.

930{
931 if (m_metaXattr) {
932 char pfn[4096];
933 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
934 long long fsize = -1ll;
935 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
936 if (res == sizeof(long long))
937 {
938 return fsize;
939 }
940 else
941 {
942 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
943 }
944 }
945
946 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
947 XrdOucEnv env;
948 long long ret;
949 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
950 if (res < 0) {
951 ret = res;
952 } else {
953 Info info(m_trace, 0);
954 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
955 ret = -EBADF;
956 } else {
957 ret = info.GetFileSize();
958 }
959 infoFile->Close();
960 }
961 delete infoFile;
962 return ret;
963}
XrdSysXAttr * XrdSysXAttrActive
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdPfc::Info::GetFileSize(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string & command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52{
53 static const char *top_epfx = "ExecuteCommandUrl ";
54
55 SplitParser cp(command_url, "/");
56
57 std::string token = cp.get_token();
58
59 if (token != "xrdpfc_command")
60 {
61 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62 return;
63 }
64
65 // Get the command
66 token = cp.get_token_as_string();
67
68 auto get_opt = [](SplitParser &sp) -> char {
69 const char *t = sp.get_token();
70 if (t)
71 return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72 else
73 return -1;
74 };
75
76 //================================================================
77 // create_file
78 //================================================================
79
80 if (token == "create_file")
81 {
82 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83 static const char* usage =
84 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85 " Creates a cache file with given parameters. Data in file is random.\n"
86 " Useful for cache purge testing.\n"
87 "Notes:\n"
88 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90 " . -t and -d can be given multiple times to record several accesses.\n"
91 " . Negative arguments given to -t are interpreted as relative to now.\n";
92
93 const Configuration &conf = m_configuration;
94
95 token = cp.get_token_as_string();
96 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97 if (token.empty()) {
98 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99 return;
100 }
101 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102 if ( ! cp.has_reminder()) {
103 TRACE(Error, err_prefix << "Path section must not be empty.");
104 return;
105 }
106
107 long long file_size = ONE_GB;
108 long long block_size = conf.m_bufferSize;
109 int access_time [MAX_ACCESSES];
110 int access_duration[MAX_ACCESSES];
111 int at_count = 0, ad_count = 0;
112
113 time_t time_now = time(0);
114
115 SplitParser ap(token, " ");
116 char theOpt;
117
118 while ((theOpt = get_opt(ap)) != (char) -1)
119 {
120 switch (theOpt)
121 {
122 case 'h': {
123 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124 return;
125 }
126 case 's': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128 &file_size, 0ll, 32 * ONE_GB))
129 return;
130 break;
131 }
132 case 'b': {
133 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134 &block_size, 0ll, 64 * ONE_MB))
135 return;
136 break;
137 }
138 case 't': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140 &access_time[at_count++], INT_MIN, INT_MAX))
141 return;
142 break;
143 }
144 case 'd': {
145 if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146 &access_duration[ad_count++], 0, 24 * 3600))
147 return;
148 break;
149 }
150 default: {
151 TRACE(Error, err_prefix << "Unhandled command argument.");
152 return;
153 }
154 }
155 }
156
157 if (at_count < 1) access_time [at_count++] = time_now - 10;
158 if (ad_count < 1) access_duration[ad_count++] = 10;
159
160 if (at_count != ad_count)
161 {
162 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163 return;
164 }
165
166 std::string file_path (cp.get_reminder_with_delim());
167 std::string cinfo_path(file_path + Info::s_infoExtension);
168
169 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170
171 // Check if cinfo exists ... bail out if it does.
172 {
173 struct stat infoStat;
174 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175 {
176 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177 return;
178 }
179 }
180
181 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182
183 {
184 const char *myUser = conf.m_username.c_str();
185 XrdOucEnv myEnv;
186
187 // Create the data file.
188
189 char size_str[32]; sprintf(size_str, "%lld", file_size);
190 myEnv.Put("oss.asize", size_str);
191 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192 int cret;
193 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194 {
195 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196 return;
197 }
198
199 XrdOssDF *myFile = GetOss()->newFile(myUser);
200 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201 {
202 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203 delete myFile;
204 return;
205 }
206
207 // Create the info file.
208
209 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212 {
213 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214 myFile->Close(); delete myFile;
215 return;
216 }
217
218 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220 {
221 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222 delete myInfoFile;
223 myFile->Close(); delete myFile;
224 return;
225 }
226
227 // Allocate space for the data file.
228
229 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230 {
231 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232 }
233
234 // Fill up cinfo.
235
236 Info myInfo(m_trace, false);
237 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238 myInfo.SetAllBitsSynced();
239
240 for (int i = 0; i < at_count; ++i)
241 {
242 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243
244 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245 }
246
247 myInfo.Write(myInfoFile, cinfo_path.c_str());
248
249 // Fake last modified time to the last access_time
250 {
251 time_t last_detach;
252 myInfo.GetLatestDetachTime(last_detach);
253 struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254
255 futimens(myInfoFile->getFD(), acc_mod_time);
256 }
257
258 myInfoFile->Close(); delete myInfoFile;
259 myFile->Close(); delete myFile;
260
261 struct stat dstat;
262 GetOss()->Stat(file_path.c_str(), &dstat);
263 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264 << "st_blocks=" << dstat.st_blocks);
265
266 {
267 XrdSysCondVarHelper lock(&m_writeQ.condVar);
268
269 m_writeQ.writes_between_purges += file_size;
270 }
271 {
272 int token = m_res_mon->register_file_open(file_path, time_now, false);
273 XrdPfc::Stats stats;
274 stats.m_BytesWritten = file_size;
275 stats.m_StBlocksAdded = dstat.st_blocks;
276 m_res_mon->register_file_update_stats(token, stats);
277 m_res_mon->register_file_close(token, time(0), stats);
278 }
279 }
280 }
281
282 //================================================================
283 // remove_file
284 //================================================================
285
286 else if (token == "remove_file")
287 {
288 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289 static const char* usage =
290 "Usage: remove_file/ [-h] /<path>\n"
291 " Removes given file from the cache unless it is currently open.\n"
292 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293 "Notes:\n"
294 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295
296 token = cp.get_token_as_string();
297 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298 if (token.empty()) {
299 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300 return;
301 }
302 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303 if ( ! cp.has_reminder()) {
304 TRACE(Error, err_prefix << "Path section must not be empty.");
305 return;
306 }
307
308 SplitParser ap(token, " ");
309 char theOpt;
310
311 while ((theOpt = get_opt(ap)) != (char) -1)
312 {
313 switch (theOpt)
314 {
315 case 'h': {
316 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317 return;
318 }
319 default: {
320 TRACE(Error, err_prefix << "Unhandled command argument.");
321 return;
322 }
323 }
324 }
325
326 std::string f_name(cp.get_reminder_with_delim());
327
328 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329
330 int ret = UnlinkFile(f_name, true);
331
332 TRACE(Info, err_prefix << "returned with status " << ret);
333 }
334
335 //================================================================
336 // unknown command
337 //================================================================
338
339 else
340 {
341 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342 }
343}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:426
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1195
XrdOss * GetOss() const
Definition XrdPfc.hh:270
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesWritten
number of bytes written to disk
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:89
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:109
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:90
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:88

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, XrdOss::Stat(), stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File * f,
bool high_debug )

Definition at line 546 of file XrdPfc.cc.

547{
548 dec_ref_cnt(f, high_debug);
549}

◆ GetFile()

File * Cache::GetFile ( const std::string & path,
IO * io,
long long off = 0,
long long filesize = 0 )

Definition at line 393 of file XrdPfc.cc.

394{
395 // Called from virtual IOFile constructor.
396
397 TRACE(Debug, "GetFile " << path << ", io " << io);
398
399 ActiveMap_i it;
400
401 {
402 XrdSysCondVarHelper lock(&m_active_cond);
403
404 while (true)
405 {
406 it = m_active.find(path);
407
408 // File is not open or being opened. Mark it as being opened and
409 // proceed to opening it outside of while loop.
410 if (it == m_active.end())
411 {
412 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
413 break;
414 }
415
416 if (it->second != 0)
417 {
418 it->second->AddIO(io);
419 inc_ref_cnt(it->second, false, true);
420
421 return it->second;
422 }
423 else
424 {
425 // Wait for some change in m_active, then recheck.
426 m_active_cond.Wait();
427 }
428 }
429 }
430
431 // This is always true, now that IOFileBlock is unsupported.
432 if (filesize == 0)
433 {
434 struct stat st;
435 int res = io->Fstat(st);
436 if (res < 0) {
437 errno = res;
438 TRACE(Error, "GetFile, could not get valid stat");
439 } else if (res > 0) {
440 errno = ENOTSUP;
441 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
442 } else {
443 filesize = st.st_size;
444 }
445 }
446
447 File *file = 0;
448
449 if (filesize >= 0)
450 {
451 file = File::FileOpen(path, off, filesize);
452 }
453
454 {
455 XrdSysCondVarHelper lock(&m_active_cond);
456
457 if (file)
458 {
459 inc_ref_cnt(file, false, true);
460 it->second = file;
461
462 file->AddIO(io);
463 }
464 else
465 {
466 m_active.erase(it);
467 }
468
469 m_active_cond.Broadcast();
470 }
471
472 return file;
473}
virtual int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
void AddIO(IO *io)

References XrdPfc::File::AddIO(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), stat, and TRACE.

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 288 of file XrdPfc.hh.

288{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 132 of file XrdPfc.cc.

132{ return *m_instance; }

References Cache().

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), Prepare(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( )
inline

Definition at line 284 of file XrdPfc.hh.

284{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

+ Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 736 of file XrdPfc.cc.

737{
738 m_prefetch_condVar.Lock();
739 while (m_prefetchList.empty())
740 {
741 m_prefetch_condVar.Wait();
742 }
743
744 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
745
746 size_t l = m_prefetchList.size();
747 int idx = rand() % l;
748 File* f = m_prefetchList[idx];
749
750 m_prefetch_condVar.UnLock();
751 return f;
752}

Referenced by Prefetch().

+ Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 270 of file XrdPfc.hh.

270{ return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin * XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 274 of file XrdPfc.hh.

274{ return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( )
inline

Definition at line 285 of file XrdPfc.hh.

285{ return m_trace; }

Referenced by XrdPfc::File::GetTrace().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string & path) const

Definition at line 680 of file XrdPfc.cc.

681{
682 XrdSysCondVarHelper lock(&m_active_cond);
683
684 return m_active.find(path) != m_active.end() ||
685 m_purge_delay_set.find(path) != m_purge_delay_set.end();
686}

◆ LocalFilePath()

int Cache::LocalFilePath ( const char * curl,
char * buff = 0,
int blen = 0,
LFP_Reason why = ForAccess,
bool forall = false )
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 797 of file XrdPfc.cc.

799{
800 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
801 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
802 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
803
804 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
805
806 if (buff && blen > 0) buff[0] = 0;
807
808 XrdCl::URL url(curl);
809 std::string f_name = url.GetPath();
810 std::string i_name = f_name + Info::s_infoExtension;
811
812 if (why == ForPath)
813 {
814 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
815 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
816 return ret;
817 }
818
819 {
820 XrdSysCondVarHelper lock(&m_active_cond);
821 m_purge_delay_set.insert(f_name);
822 }
823
824 struct stat sbuff, sbuff2;
825 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
826 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
827 {
828 if (S_ISDIR(sbuff.st_mode))
829 {
830 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
831 return -EISDIR;
832 }
833 else
834 {
835 bool read_ok = false;
836 bool is_complete = false;
837
838 // Lock and check if the file is active. If NOT, keep the lock
839 // and add dummy access after successful reading of info file.
840 // If it IS active, just release the lock, this ongoing access will
841 // assure the file continues to exist.
842
843 // XXXX How can I just loop over the cinfo file when active?
844 // Can I not get is_complete from the existing file?
845 // Do I still want to inject access record?
846 // Oh, it writes only if not active .... still let's try to use existing File.
847
848 m_active_cond.Lock();
849
850 bool is_active = m_active.find(f_name) != m_active.end();
851
852 if (is_active) m_active_cond.UnLock();
853
854 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
855 XrdOucEnv myEnv;
856 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
857 if (res >= 0)
858 {
859 Info info(m_trace, 0);
860 if (info.Read(infoFile, i_name.c_str()))
861 {
862 read_ok = true;
863
864 is_complete = info.IsComplete();
865
866 // Add full-size access if reason is for access.
867 if ( ! is_active && is_complete && why == ForAccess)
868 {
869 info.WriteIOStatSingle(info.GetFileSize());
870 info.Write(infoFile, i_name.c_str());
871 }
872 }
873 infoFile->Close();
874 }
875 delete infoFile;
876
877 if ( ! is_active) m_active_cond.UnLock();
878
879 if (read_ok)
880 {
881 if ((is_complete || why == ForInfo) && buff != 0)
882 {
883 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
884 if (res2 < 0)
885 return res2;
886
887 // Normally, files are owned by us but when direct cache access
888 // is wanted and possible, make sure the file is world readable.
889 if (why == ForAccess)
890 {mode_t mode = (forall ? worldReadable : groupReadable);
891 if (((sbuff.st_mode & worldReadable) != mode)
892 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
893 {is_complete = false;
894 *buff = 0;
895 }
896 }
897 }
898
899 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
900 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
901
902 return is_complete ? 0 : -EREMOTE;
903 }
904 }
905 }
906
907 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
908 return -ENOENT;
909}

References XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, TRACE, XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 755 of file XrdPfc.cc.

756{
757 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
758
759 while (true)
760 {
761 m_RAM_mutex.Lock();
762 bool doPrefetch = (m_RAM_used < limit_RAM);
763 m_RAM_mutex.UnLock();
764
765 if (doPrefetch)
766 {
768 f->Prefetch();
769 }
770 else
771 {
773 }
774 }
775}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:736
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdPfc::File::Prefetch(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Prepare()

int Cache::Prepare ( const char * curl,
int oflags,
mode_t mode )
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1069 of file XrdPfc.cc.

1070{
1071 XrdCl::URL url(curl);
1072 std::string f_name = url.GetPath();
1073 std::string i_name = f_name + Info::s_infoExtension;
1074
1075 // Do not allow write access.
1076 if ((oflags & O_ACCMODE) != O_RDONLY)
1077 {
1078 if (Cache::GetInstance().RefConfiguration().m_write_through)
1079 {
1080 return 0;
1081 }
1082 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1083 return -EROFS;
1084 }
1085
1086 // Intercept xrdpfc_command requests.
1087 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1088 {
1089 // Schedule a job to process command request.
1090 {
1091 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1092
1093 schedP->Schedule(ce);
1094 }
1095
1096 return -EAGAIN;
1097 }
1098
1099 {
1100 XrdSysCondVarHelper lock(&m_active_cond);
1101 m_purge_delay_set.insert(f_name);
1102 }
1103
1104 struct stat sbuff;
1105 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1106 {
1107 TRACE(Dump, "Prepare defer open " << f_name);
1108 return 1;
1109 }
1110 else
1111 {
1112 return 0;
1113 }
1114}
static XrdScheduler * schedP
Definition XrdPfc.hh:292

References GetInstance(), XrdCl::URL::GetPath(), RefConfiguration(), XrdPfc::Info::s_infoExtension, schedP, stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 277 of file XrdPfc.cc.

278{
279 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
280
281 while (true)
282 {
283 m_writeQ.condVar.Lock();
284 while (m_writeQ.size == 0)
285 {
286 m_writeQ.condVar.Wait();
287 }
288
289 // MT -- optimize to pop several blocks if they are available (or swap the list).
290 // This makes sense especially for smallish block sizes.
291
292 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
293 long long sum_size = 0;
294
295 for (int bi = 0; bi < n_pushed; ++bi)
296 {
297 Block* block = m_writeQ.queue.front();
298 m_writeQ.queue.pop_front();
299 m_writeQ.writes_between_purges += block->get_size();
300 sum_size += block->get_size();
301
302 blks_to_write[bi] = block;
303
304 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
305 }
306 m_writeQ.size -= n_pushed;
307
308 m_writeQ.condVar.UnLock();
309
310 {
311 XrdSysMutexHelper lock(&m_RAM_mutex);
312 m_RAM_write_queue -= sum_size;
313 }
314
315 for (int bi = 0; bi < n_pushed; ++bi)
316 {
317 Block* block = blks_to_write[bi];
318
319 block->m_file->WriteBlockToDisk(block);
320 }
321 }
322}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 206 of file XrdPfc.hh.

206{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Prepare(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor & XrdPfc::Cache::RefResMon ( )
inline

Definition at line 287 of file XrdPfc.hh.

287{ return *m_res_mon; }

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File * file)

Definition at line 698 of file XrdPfc.cc.

699{
700 // Can be called with other locks held.
701
702 if ( ! m_prefetch_enabled)
703 {
704 return;
705 }
706
707 m_prefetch_condVar.Lock();
708 m_prefetchList.push_back(file);
709 m_prefetch_condVar.Signal();
710 m_prefetch_condVar.UnLock();
711}

◆ ReleaseFile()

void Cache::ReleaseFile ( File * f,
IO * io )

Definition at line 475 of file XrdPfc.cc.

476{
477 // Called from virtual IO::DetachFinalize.
478
479 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
480
481 {
482 XrdSysCondVarHelper lock(&m_active_cond);
483
484 f->RemoveIO(io);
485 }
486 dec_ref_cnt(f, true);
487}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char * buf,
long long size )

Definition at line 375 of file XrdPfc.cc.

376{
377 bool std_size = (size == m_configuration.m_bufferSize);
378 {
379 XrdSysMutexHelper lock(&m_RAM_mutex);
380
381 m_RAM_used -= size;
382
383 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
384 {
385 m_RAM_std_blocks.push_back(buf);
386 ++m_RAM_std_size;
387 return;
388 }
389 }
390 free(buf);
391}

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File * f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 244 of file XrdPfc.cc.

245{
246 std::list<Block*> removed_blocks;
247 long long sum_size = 0;
248
249 m_writeQ.condVar.Lock();
250 std::list<Block*>::iterator i = m_writeQ.queue.begin();
251 while (i != m_writeQ.queue.end())
252 {
253 if ((*i)->m_file == file)
254 {
255 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
256 std::list<Block*>::iterator j = i++;
257 removed_blocks.push_back(*j);
258 sum_size += (*j)->get_size();
259 m_writeQ.queue.erase(j);
260 --m_writeQ.size;
261 }
262 else
263 {
264 ++i;
265 }
266 }
267 m_writeQ.condVar.UnLock();
268
269 {
270 XrdSysMutexHelper lock(&m_RAM_mutex);
271 m_RAM_write_queue -= sum_size;
272 }
273
274 file->BlocksRemovedFromWriteQ(removed_blocks);
275}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long size)

Definition at line 335 of file XrdPfc.cc.

336{
337 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
338
339 bool std_size = (size == m_configuration.m_bufferSize);
340
341 m_RAM_mutex.Lock();
342
343 long long total = m_RAM_used + size;
344
345 if (total <= m_configuration.m_RamAbsAvailable)
346 {
347 m_RAM_used = total;
348 if (std_size && m_RAM_std_size > 0)
349 {
350 char *buf = m_RAM_std_blocks.back();
351 m_RAM_std_blocks.pop_back();
352 --m_RAM_std_size;
353
354 m_RAM_mutex.UnLock();
355
356 return buf;
357 }
358 else
359 {
360 m_RAM_mutex.UnLock();
361 char *buf;
362 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
363 {
364 // Report out of mem? Probably should report it at least the first time,
365 // then periodically.
366 return 0;
367 }
368 return buf;
369 }
370 }
371 m_RAM_mutex.UnLock();
372 return 0;
373}

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 135 of file XrdPfc.cc.

135{ return m_instance->RefResMon(); }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File * f)
inline

Definition at line 280 of file XrdPfc.hh.

280{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char * curl,
struct stat & sbuff )
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1124 of file XrdPfc.cc.

1125{
1126 const char *tpfx = "Stat ";
1127
1128 XrdCl::URL url(curl);
1129 std::string f_name = url.GetPath();
1130
1131 File *file = nullptr;
1132 {
1133 XrdSysCondVarHelper lock(&m_active_cond);
1134 auto it = m_active.find(f_name);
1135 if (it != m_active.end()) {
1136 file = it->second;
1137 // If `file` is nullptr, the file-open is in progress; instead
1138 // of waiting for the file-open to finish, simply treat it as if
1139 // the file-open doesn't exist.
1140 if (file) {
1141 inc_ref_cnt(file, false, false);
1142 }
1143 }
1144 }
1145 if (file) {
1146 int res = file->Fstat(sbuff);
1147 dec_ref_cnt(file, false);
1148 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1149 return res;
1150 }
1151
1152 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1153 if (res != XrdOssOK) {
1154 TRACE(Debug, tpfx << curl << " -> " << res);
1155 return 1; // res; -- for only-if-cached
1156 }
1157 if (S_ISDIR(sbuff.st_mode))
1158 {
1159 TRACE(Debug, tpfx << curl << " -> EISDIR");
1160 return -EISDIR;
1161 }
1162
1163 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1164 if (file_size < 0) {
1165 TRACE(Debug, tpfx << curl << " -> " << file_size);
1166 return 1; // (int) file_size; -- for only-if-cached
1167 }
1168 sbuff.st_size = file_size;
1169 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1170 if ( ! is_cached)
1171 sbuff.st_atime = 0;
1172
1173 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1174
1175 return 0;
1176}

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

+ Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 133 of file XrdPfc.cc.

133{ return *m_instance; }

References Cache().

Referenced by XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char * curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1185 of file XrdPfc.cc.

1186{
1187 XrdCl::URL url(curl);
1188 std::string f_name = url.GetPath();
1189
1190 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1191
1192 return UnlinkFile(f_name, false);
1193}

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string & f_name,
bool fail_if_open )

Remove cinfo and data files from cache.

Definition at line 1195 of file XrdPfc.cc.

1196{
1197 static const char* trc_pfx = "UnlinkFile ";
1198 ActiveMap_i it;
1199 File *file = 0;
1200 long long st_blocks_to_purge = 0;
1201 {
1202 XrdSysCondVarHelper lock(&m_active_cond);
1203
1204 it = m_active.find(f_name);
1205
1206 if (it != m_active.end())
1207 {
1208 if (fail_if_open)
1209 {
1210 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1211 return -EBUSY;
1212 }
1213
1214 // Null File* in m_active map means an operation is ongoing, probably
1215 // Attach() with possible File::Open(). Ask for retry.
1216 if (it->second == 0)
1217 {
1218 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1219 return -EAGAIN;
1220 }
1221
1222 file = it->second;
1223 st_blocks_to_purge = file->initiate_emergency_shutdown();
1224 it->second = 0;
1225 }
1226 else
1227 {
1228 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1229 }
1230 }
1231
1232 if (file) {
1234 } else {
1235 struct stat f_stat;
1236 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1237 st_blocks_to_purge = f_stat.st_blocks;
1238 }
1239
1240 std::string i_name = f_name + Info::s_infoExtension;
1241
1242 // Unlink file & cinfo
1243 int f_ret = m_oss->Unlink(f_name.c_str());
1244 int i_ret = m_oss->Unlink(i_name.c_str());
1245
1246 if (st_blocks_to_purge)
1247 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1248
1249 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1250
1251 {
1252 XrdSysCondVarHelper lock(&m_active_cond);
1253
1254 m_active.erase(it);
1255 }
1256
1257 return std::min(f_ret, i_ret);
1258}
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:244
long long initiate_emergency_shutdown()

References Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfc::File::Sync(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo & urVersion)
inlinestatic

Version check.

Definition at line 235 of file XrdPfc.hh.

235{ return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int cinfo_fd,
long long file_size )

Definition at line 914 of file XrdPfc.cc.

915{
916 if (m_metaXattr) {
917 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
918 if (res != 0) {
919 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
920 }
921 }
922}
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, TRACE, and XrdSysXAttrActive.

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 324 of file XrdPfc.cc.

325{
326 // Called from ResourceMonitor for an alternative estimation of disk writes.
327 XrdSysCondVarHelper lock(&m_writeQ.condVar);
328 long long ret = m_writeQ.writes_between_purges;
329 m_writeQ.writes_between_purges = 0;
330 return ret;
331}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: