XRootD
Loading...
Searching...
No Matches
XrdPfcConfiguration.cc
Go to the documentation of this file.
1#include "XrdPfc.hh"
2#include "XrdPfcTrace.hh"
3#include "XrdPfcInfo.hh"
4
6#include "XrdPfcPurgePin.hh"
7
8#include "XrdOss/XrdOss.hh"
9
10#include "XrdOuc/XrdOucEnv.hh"
11#include "XrdOuc/XrdOucUtils.hh"
14#include "XrdOuc/XrdOuca2x.hh"
15
16#include "XrdVersion.hh"
18#include "XrdSys/XrdSysXAttr.hh"
19
20#include <fcntl.h>
21
23
24namespace XrdPfc
25{
26 const char *trace_what_strings[] = {"","error ","warning ","info ","debug ","dump "};
27}
28
29using namespace XrdPfc;
30
32
66
67
68bool Cache::cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name)
69{
70 char errStr[1024];
71 snprintf(errStr, 1024, "ConfigParameters() Error parsing parameter %s", name);
72
73 if (::isalpha(*(str.rbegin())))
74 {
75 if (XrdOuca2x::a2sz(m_log, errStr, str.c_str(), &store, 0, totalSpace))
76 {
77 return false;
78 }
79 }
80 else
81 {
82 char *eP;
83 errno = 0;
84 double frac = strtod(str.c_str(), &eP);
85 if (errno || eP == str.c_str())
86 {
87 m_log.Emsg(errStr, str.c_str());
88 return false;
89 }
90
91 store = static_cast<long long>(totalSpace * frac + 0.5);
92 }
93
94 if (store < 0 || store > totalSpace)
95 {
96 snprintf(errStr, 1024, "ConfigParameters() Error: parameter %s should be between 0 and total available disk space (%lld) - it is %lld (given as %s)",
97 name, totalSpace, store, str.c_str());
98 m_log.Emsg(errStr, "");
99 return false;
100 }
101
102 return true;
103}
104
105/* Function: xcschk
106
107 Purpose: To parse the directive: cschk <parms>
108
109 parms: [[no]net] [[no]tls] [[no]cache] [uvkeep <arg>]
110
111 all Checksum check on cache & net transfers.
112 cache Checksum check on cache only, 'no' turns it off.
113 net Checksum check on net transfers 'no' turns it off.
114 tls use TLS if server doesn't support checksums 'no' turns it off.
115 uvkeep Maximum amount of time a cached file make be kept if it
116 contains unverified checksums as n[d|h|m|s], where 'n'
117 is a non-negative integer. A value of 0 prohibits disk
118 caching unless the checksum can be verified. You can
119 also specify "lru" which means the standard purge policy
120 is to be used.
121
122 Output: true upon success or false upon failure.
123 */
124bool Cache::xcschk(XrdOucStream &Config)
125{
126 const char *val, *val2;
127 struct cschkopts {const char *opname; int opval;} csopts[] =
128 {
129 {"off", CSChk_None},
130 {"cache", CSChk_Cache},
131 {"net", CSChk_Net},
132 {"tls", CSChk_TLS}
133 };
134 int i, numopts = sizeof(csopts)/sizeof(struct cschkopts);
135 bool isNo;
136
137 if (! (val = Config.GetWord()))
138 {m_log.Emsg("Config", "cschk parameter not specified"); return false; }
139
140 while(val)
141 {
142 if ((isNo = strncmp(val, "no", 2) == 0))
143 val2 = val + 2;
144 else
145 val2 = val;
146 for (i = 0; i < numopts; i++)
147 {
148 if (!strcmp(val2, csopts[i].opname))
149 {
150 if (isNo)
151 m_configuration.m_cs_Chk &= ~csopts[i].opval;
152 else if (csopts[i].opval)
153 m_configuration.m_cs_Chk |= csopts[i].opval;
154 else
155 m_configuration.m_cs_Chk = csopts[i].opval;
156 break;
157 }
158 }
159 if (i >= numopts)
160 {
161 if (strcmp(val, "uvkeep"))
162 {
163 m_log.Emsg("Config", "invalid cschk option -", val);
164 return false;
165 }
166 if (!(val = Config.GetWord()))
167 {
168 m_log.Emsg("Config", "cschk uvkeep value not specified");
169 return false;
170 }
171 if (!strcmp(val, "lru"))
172 m_configuration.m_cs_UVKeep = -1;
173 else
174 {
175 int uvkeep;
176 if (XrdOuca2x::a2tm(m_log, "uvkeep time", val, &uvkeep, 0))
177 return false;
178 m_configuration.m_cs_UVKeep = uvkeep;
179 }
180 }
181 val = Config.GetWord();
182 }
183 // Decompose into separate TLS state, it is only passed on to psx
184 m_configuration.m_cs_ChkTLS = m_configuration.m_cs_Chk & CSChk_TLS;
185 m_configuration.m_cs_Chk &= ~CSChk_TLS;
186
187 m_env->Put("psx.CSNet", m_configuration.is_cschk_net() ? (m_configuration.m_cs_ChkTLS ? "2" : "1") : "0");
188
189 return true;
190}
191
192
193/* Function: xdlib
194
195 Purpose: To parse the directive: decisionlib <path> [<parms>]
196
197 <path> the path of the decision library to be used.
198 <parms> optional parameters to be passed.
199
200
201 Output: true upon success or false upon failure.
202 */
203bool Cache::xdlib(XrdOucStream &Config)
204{
205 const char* val;
206
207 std::string libp;
208 if (! (val = Config.GetWord()) || ! val[0])
209 {
210 TRACE(Info," Cache::Config() decisionlib not specified; always caching files");
211 return true;
212 }
213 else
214 {
215 libp = val;
216 }
217
218 char params[4096];
219 if (val[0])
220 Config.GetRest(params, 4096);
221 else
222 params[0] = 0;
223
224 XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "decisionlib",
225 libp.c_str());
226
227 Decision *(*ep)(XrdSysError&);
228 ep = (Decision *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetDecision");
229 if (! ep) {myLib->Unload(true); return false; }
230
231 Decision * d = ep(m_log);
232 if (! d)
233 {
234 TRACE(Error, "Config() decisionlib was not able to create a decision object");
235 return false;
236 }
237 if (params[0])
238 d->ConfigDecision(params);
239
240 m_decisionpoints.push_back(d);
241 return true;
242}
243
244/* Function: xplib
245
246 Purpose: To parse the directive: purgelib <path> [<parms>]
247
248 <path> the path of the decision library to be used.
249 <parms> optional parameters to be passed.
250
251
252 Output: true upon success or false upon failure.
253 */
254bool Cache::xplib(XrdOucStream &Config)
255{
256 const char* val;
257
258 std::string libp;
259 if (! (val = Config.GetWord()) || ! val[0])
260 {
261 TRACE(Info," Cache::Config() purgelib not specified; will use LRU for purging files");
262 return true;
263 }
264 else
265 {
266 libp = val;
267 }
268
269 char params[4096];
270 if (val[0])
271 Config.GetRest(params, 4096);
272 else
273 params[0] = 0;
274
275 XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "purgelib",
276 libp.c_str());
277
278 PurgePin *(*ep)(XrdSysError&);
279 ep = (PurgePin *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetPurgePin");
280 if (! ep) {myLib->Unload(true); return false; }
281
282 PurgePin * dp = ep(m_log);
283 if (! dp)
284 {
285 TRACE(Error, "Config() purgelib was not able to create a Purge Plugin object?");
286 return false;
287 }
288 m_purge_pin = dp;
289
290 if (params[0])
291 m_purge_pin->ConfigPurgePin(params);
292
293
294 return true;
295}
296
297/* Function: xtrace
298
299 Purpose: To parse the directive: trace <level>
300 Output: true upon success or false upon failure.
301 */
302bool Cache::xtrace(XrdOucStream &Config)
303{
304 char *val;
305 static struct traceopts {const char *opname; int opval; } tropts[] =
306 {
307 {"none", 0},
308 {"error", 1},
309 {"warning", 2},
310 {"info", 3},
311 {"debug", 4},
312 {"dump", 5},
313 {"dumpxl", 6}
314 };
315 int numopts = sizeof(tropts)/sizeof(struct traceopts);
316
317 if (! (val = Config.GetWord()))
318 {m_log.Emsg("Config", "trace option not specified"); return 1; }
319
320 for (int i = 0; i < numopts; i++)
321 {
322 if (! strcmp(val, tropts[i].opname))
323 {
324 m_trace->What = tropts[i].opval;
325 return true;
326 }
327 }
328 m_log.Emsg("Config", "invalid trace option -", val);
329 return false;
330}
331
332// Determine if oss spaces are operational and if they support xattrs.
333bool Cache::test_oss_basics_and_features()
334{
335 static const char *epfx = "test_oss_basics_and_features()";
336
337 const auto &conf = m_configuration;
338 const char *user = conf.m_username.c_str();
339 XrdOucEnv env;
340
341 auto check_space = [&](const char *space, bool &has_xattr)
342 {
343 std::string fname("__prerun_test_pfc_");
344 fname += space;
345 fname += "_space__";
346 env.Put("oss.cgroup", space);
347
348 int res = m_oss->Create(user, fname.c_str(), 0600, env, XRDOSS_mkpath);
349 if (res != XrdOssOK) {
350 m_log.Emsg(epfx, "Can not create a file on space", space);
351 return false;
352 }
353 XrdOssDF *oss_file = m_oss->newFile(user);
354 res = oss_file->Open(fname.c_str(), O_RDWR, 0600, env);
355 if (res != XrdOssOK) {
356 m_log.Emsg(epfx, "Can not open a file on space", space);
357 return false;
358 }
359 res = oss_file->Write(fname.data(), 0, fname.length());
360 if (res != (int) fname.length()) {
361 m_log.Emsg(epfx, "Can not write into a file on space", space);
362 return false;
363 }
364
365 has_xattr = true;
366 long long fsize = fname.length();
367 res = XrdSysXAttrActive->Set("pfc.fsize", &fsize, sizeof(long long), 0, oss_file->getFD(), 0);
368 if (res != 0) {
369 m_log.Emsg(epfx, "Can not write xattr to a file on space", space);
370 has_xattr = false;
371 }
372
373 oss_file->Close();
374
375 if (has_xattr) {
376 char pfn[4096];
377 m_oss->Lfn2Pfn(fname.c_str(), pfn, 4096);
378 fsize = -1ll;
379 res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
380 if (res != sizeof(long long) || fsize != (long long) fname.length())
381 {
382 m_log.Emsg(epfx, "Can not read xattr from a file on space", space);
383 has_xattr = false;
384 }
385 }
386
387 res = m_oss->Unlink(fname.c_str());
388 if (res != XrdOssOK) {
389 m_log.Emsg(epfx, "Can not unlink a file on space", space);
390 return false;
391 }
392
393 return true;
394 };
395
396 bool aOK = true;
397 aOK &= check_space(conf.m_data_space.c_str(), m_dataXattr);
398 aOK &= check_space(conf.m_meta_space.c_str(), m_metaXattr);
399
400 return aOK;
401}
402
403//______________________________________________________________________________
404/* Function: Config
405
406 Purpose: To parse configuration file and configure Cache instance.
407 Output: true upon success or false upon failure.
408 */
409bool Cache::Config(const char *config_filename, const char *parameters)
410{
411 // Indicate whether or not we are a client instance
412 const char *theINS = getenv("XRDINSTANCE");
413 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
414
415 // Tell everyone else we are a caching proxy
416 XrdOucEnv::Export("XRDPFC", 1);
417
418 XrdOucEnv myEnv;
419 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
420
421 if (! config_filename || ! *config_filename)
422 {
423 TRACE(Error, "Config() configuration file not specified.");
424 return false;
425 }
426
427 int fd;
428 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
429 {
430 TRACE( Error, "Config() can't open configuration file " << config_filename);
431 return false;
432 }
433
434 Config.Attach(fd);
435 static const char *cvec[] = { "*** pfc plugin config:", 0 };
436 Config.Capture(cvec);
437
438 // Obtain OFS configurator for OSS plugin.
439 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
440 &XrdVERSIONINFOVAR(XrdOucGetCache));
441 if (! ofsCfg) return false;
442
443 TmpConfiguration tmpc;
444
445 // Adjust default parameters for client/serverless caching
446 if (m_isClient)
447 {
448 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
449 m_configuration.m_wqueue_blocks = 8;
450 m_configuration.m_wqueue_threads = 1;
451 }
452
453 // If network checksum processing is the default, indicate so.
454 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
455
456 // Actual parsing of the config file.
457 bool retval = true, aOK = true;
458 char *var;
459 while ((var = Config.GetMyFirstWord()))
460 {
461 if (! strcmp(var,"pfc.osslib"))
462 {
463 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
464 }
465 else if (! strcmp(var,"pfc.cschk"))
466 {
467 retval = xcschk(Config);
468 }
469 else if (! strcmp(var,"pfc.decisionlib"))
470 {
471 retval = xdlib(Config);
472 }
473 else if (! strcmp(var,"pfc.purgelib"))
474 {
475 retval = xplib(Config);
476 }
477 else if (! strcmp(var,"pfc.trace"))
478 {
479 retval = xtrace(Config);
480 }
481 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
482 {
483 m_configuration.m_allow_xrdpfc_command = true;
484 }
485 else if (! strncmp(var,"pfc.", 4))
486 {
487 retval = ConfigParameters(std::string(var+4), Config, tmpc);
488 }
489
490 if ( ! retval)
491 {
492 TRACE(Error, "Config() error in parsing");
493 aOK = false;
494 }
495 }
496
497 Config.Close();
498
499 // Load OSS plugin.
500 myEnv.Put("oss.runmode", "pfc");
501 if (m_configuration.is_cschk_cache())
502 {
503 char csi_conf[128];
504 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
505 {
506 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
507 } else {
508 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
509 return false;
510 }
511 }
512 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
513 {
514 ofsCfg->Plugin(m_oss);
515 }
516 else
517 {
518 TRACE(Error, "Config() Unable to create an OSS object");
519 return false;
520 }
521
522 // Test if OSS is operational, determine optional features.
523 aOK &= test_oss_basics_and_features();
524
525 // sets default value for disk usage
526 XrdOssVSInfo sP;
527 {
528 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
529 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
530 {
531 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
532 return false;
533 }
534 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
535 {
536 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
537 m_configuration.m_meta_space.c_str());
538 return false;
539 }
540 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
541 {
542 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
543 return false;
544 }
545 if (sP.Total < 10ll << 20)
546 {
547 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
548 m_configuration.m_data_space.c_str());
549 return false;
550 }
551
552 m_configuration.m_diskTotalSpace = sP.Total;
553
554 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
555 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
556 {
557 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
558 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
559 aOK = false;
560 }
561 }
562 else aOK = false;
563
564 if ( ! tmpc.m_fileUsageMax.empty())
565 {
566 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
567 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
568 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
569 {
570 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
571 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
572 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
573 {
574 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
575 aOK = false;
576 }
577
578
579 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
580 {
581 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
582 aOK = false;
583 }
584 }
585 else aOK = false;
586 }
587 }
588
589 // sets flush frequency
590 if ( ! tmpc.m_flushRaw.empty())
591 {
592 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
593 {
594 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
595 &m_configuration.m_flushCnt,
596 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
597 {
598 return false;
599 }
600 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
601 }
602 else
603 {
604 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
605 &m_configuration.m_flushCnt, 100, 100000))
606 {
607 return false;
608 }
609 }
610 }
611
612 // set the write mode
613 if ( ! tmpc.m_writemodeRaw.empty())
614 {
615 if (tmpc.m_writemodeRaw == "writethrough")
616 {
617 m_configuration.m_write_through = true;
618 }
619 else if (tmpc.m_writemodeRaw != "off")
620 {
621 m_log.Emsg("ConfigParameters()", "Unknown value for pfc.writemode (valid values are `writethrough` or `off`): %s",
622 tmpc.m_writemodeRaw.c_str());
623 return false;
624 }
625 }
626
627 // get number of available RAM blocks after process configuration
628 if (m_configuration.m_RamAbsAvailable == 0)
629 {
630 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
631 char buff[1024];
632 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
633 m_log.Say("Config info: ", buff);
634 }
635 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
636 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
637
638 // Set tracing to debug if this is set in environment
639 char* cenv = getenv("XRDDEBUG");
640 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
641
642 if (aOK)
643 {
644 int loff = 0;
645// 000 001 010
646 const char *csc[] = {"off", "cache nonet", "nocache net notls",
647// 011
648 "cache net notls",
649// 100 101 110
650 "off", "cache nonet", "nocache net tls",
651// 111
652 "cache net tls"};
653 char buff[8192], uvk[32];
654 if (m_configuration.m_cs_UVKeep < 0)
655 strcpy(uvk, "lru");
656 else
657 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
658 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
659 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
660 " pfc.cschk %s uvkeep %s\n"
661 " pfc.blocksize %lld\n"
662 " pfc.prefetch %d\n"
663 " pfc.ram %.fg\n"
664 " pfc.writequeue %d %d\n"
665 " # Total available disk: %lld\n"
666 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
667 " pfc.spaces %s %s\n"
668 " pfc.trace %d\n"
669 " pfc.flush %lld\n"
670 " pfc.acchistorysize %d\n"
671 " pfc.onlyIfCachedMinBytes %lld\n"
672 " pfc.onlyIfCachedMinFrac %.2f\n",
673 config_filename,
674 csc[int(m_configuration.m_cs_Chk)], uvk,
675 m_configuration.m_bufferSize,
676 m_configuration.m_prefetch_max_blocks,
677 rg,
678 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
679 sP.Total,
680 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
681 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
682 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
683 m_configuration.m_data_space.c_str(),
684 m_configuration.m_meta_space.c_str(),
685 m_trace->What,
686 m_configuration.m_flushCnt,
687 m_configuration.m_accHistorySize,
688 m_configuration.m_onlyIfCachedMinSize,
689 m_configuration.m_onlyIfCachedMinFrac);
690
691 if (m_configuration.is_dir_stat_reporting_on())
692 {
693 loff += snprintf(buff + loff, sizeof(buff) - loff,
694 " pfc.dirstats interval %d maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
695 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
696 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
697 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
698 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
699 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
700 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
701 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
702 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
703 }
704
705 if (m_configuration.m_hdfsmode)
706 {
707 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
708 }
709 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.writemode %s\n", m_configuration.m_write_through ? "writethrough" : "off");
710
711 if (m_configuration.m_username.empty())
712 {
713 char unameBuff[256];
714 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
715 m_configuration.m_username = unameBuff;
716 }
717 else
718 {
719 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
720 }
721
722 m_log.Say(buff);
723
724 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
725 }
726
727 // Derived settings
728 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
729 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
730
731 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
732
733 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
734
735 // Create the ResourceMonitor and get it ready for starting the main thread function.
736 if (aOK)
737 {
738 m_res_mon = new ResourceMonitor(*m_oss);
739 m_res_mon->init_before_main();
740 }
741
742 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
743
744 if (ofsCfg) delete ofsCfg;
745
746 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
747 // Building of xrdpfc_print fails when this is enabled.
748#ifdef XRDPFC_CKSUM_TEST
749 {
750 int xxx = m_configuration.m_cs_Chk;
751
752 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
753 {
754 Info::TestCksumStuff();
755 }
756
757 m_configuration.m_cs_Chk = xxx;
758 }
759#endif
760
761 return aOK;
762}
763
764//------------------------------------------------------------------------------
765
766bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfiguration &tmpc)
767{
768 struct ConfWordGetter
769 {
770 XrdOucStream &m_config;
771 char *m_last_word;
772
773 ConfWordGetter(XrdOucStream& c) : m_config(c), m_last_word((char*)1) {}
774
775 const char* GetWord() { if (HasLast()) m_last_word = m_config.GetWord(); return HasLast() ? m_last_word : ""; }
776 bool HasLast() { return (m_last_word != 0); }
777 };
778
779 ConfWordGetter cwg(config);
780
781 XrdSysError err(0, "");
782 if ( part == "user" )
783 {
784 m_configuration.m_username = cwg.GetWord();
785 if ( ! cwg.HasLast())
786 {
787 m_log.Emsg("Config", "Error: pfc.user requires a parameter.");
788 return false;
789 }
790 }
791 else if ( part == "diskusage" )
792 {
793 tmpc.m_diskUsageLWM = cwg.GetWord();
794 tmpc.m_diskUsageHWM = cwg.GetWord();
795
796 if (tmpc.m_diskUsageHWM.empty())
797 {
798 m_log.Emsg("Config", "Error: pfc.diskusage parameter requires at least two arguments.");
799 return false;
800 }
801
802 const char *p = 0;
803 while ((p = cwg.GetWord()) && cwg.HasLast())
804 {
805 if (strcmp(p, "files") == 0)
806 {
807 tmpc.m_fileUsageBaseline = cwg.GetWord();
808 tmpc.m_fileUsageNominal = cwg.GetWord();
809 tmpc.m_fileUsageMax = cwg.GetWord();
810
811 if ( ! cwg.HasLast())
812 {
813 m_log.Emsg("Config", "Error: pfc.diskusage files directive requires three arguments.");
814 return false;
815 }
816 }
817 else if (strcmp(p, "sleep") == 0 || strcmp(p, "purgeinterval") == 0)
818 {
819 if (strcmp(p, "sleep") == 0) m_log.Emsg("Config", "warning sleep directive is deprecated in pfc.diskusage. Please use purgeinterval instead.");
820
821 if (XrdOuca2x::a2tm(m_log, "Error getting purgeinterval", cwg.GetWord(), &m_configuration.m_purgeInterval, 60, 3600))
822 {
823 return false;
824 }
825 }
826 else if (strcmp(p, "purgecoldfiles") == 0)
827 {
828 if (XrdOuca2x::a2tm(m_log, "Error getting purgecoldfiles age", cwg.GetWord(), &m_configuration.m_purgeColdFilesAge, 3600, 3600*24*360))
829 {
830 return false;
831 }
832 if (XrdOuca2x::a2i(m_log, "Error getting purgecoldfiles period", cwg.GetWord(), &m_configuration.m_purgeAgeBasedPeriod, 1, 1000))
833 {
834 return false;
835 }
836 }
837 else
838 {
839 m_log.Emsg("Config", "Error: diskusage stanza contains unknown directive", p);
840 }
841 }
842 }
843 else if ( part == "acchistorysize" )
844 {
845 if ( XrdOuca2x::a2i(m_log, "Error getting access-history-size", cwg.GetWord(), &m_configuration.m_accHistorySize, 20, 200))
846 {
847 return false;
848 }
849 }
850 else if ( part == "dirstats" )
851 {
852 const char *p = 0;
853 while ((p = cwg.GetWord()) && cwg.HasLast())
854 {
855 if (strcmp(p, "interval") == 0)
856 {
857 if (XrdOuca2x::a2i(m_log, "Error getting dirstsat interval", cwg.GetWord(), &m_configuration.m_dirStatsInterval, 0, 7 * 24 * 3600))
858 {
859 return false;
860 }
861 int validIntervals[] = {60, 300, 600, 900, 1800, 3600};
862 int size = sizeof(validIntervals) / sizeof(int);
863 bool match = false;
864 std::string vvl;
865 for (int i = 0; i < size; i++) {
866 if (validIntervals[i] == m_configuration.m_dirStatsInterval) {
867 match = true;
868 break;
869 }
870 vvl += std::to_string(validIntervals[i]);
871 if ((i+1) != size) vvl += ", ";
872 }
873
874 if (!match) {
875 m_log.Emsg("Config", "Error: Dirstat interval is not valid. Possible interval values are ", vvl.c_str());
876 return false;
877 }
878
879 }
880 else if (strcmp(p, "maxdepth") == 0)
881 {
882 if (XrdOuca2x::a2i(m_log, "Error getting maxdepth value", cwg.GetWord(), &m_configuration.m_dirStatsMaxDepth, 0, 16))
883 {
884 return false;
885 }
886 m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, m_configuration.m_dirStatsMaxDepth);
887 }
888 else if (strcmp(p, "dir") == 0)
889 {
890 p = cwg.GetWord();
891 if (p && p[0] == '/')
892 {
893 // XXX -- should we just store them as sets of PathTokenizer objects, not strings?
894
895 char d[1024]; d[0] = 0;
896 int depth = 0;
897 { // Compress multiple slashes and "measure" depth
898 const char *pp = p;
899 char *pd = d;
900 *(pd++) = *(pp++);
901 while (*pp != 0)
902 {
903 if (*(pd - 1) == '/')
904 {
905 if (*pp == '/')
906 {
907 ++pp; continue;
908 }
909 ++depth;
910 }
911 *(pd++) = *(pp++);
912 }
913 *(pd--) = 0;
914 // remove trailing but but not leading /
915 if (*pd == '/' && pd != d) *pd = 0;
916 }
917 int ld = strlen(d);
918 if (ld >= 2 && d[ld-1] == '*' && d[ld-2] == '/')
919 {
920 d[ld-2] = 0;
921 ld -= 2;
922 m_configuration.m_dirStatsDirGlobs.insert(d);
923 printf("Glob %s -> %s -- depth = %d\n", p, d, depth);
924 }
925 else
926 {
927 m_configuration.m_dirStatsDirs.insert(d);
928 printf("Dir %s -> %s -- depth = %d\n", p, d, depth);
929 }
930
931 m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, depth);
932 }
933 else
934 {
935 m_log.Emsg("Config", "Error: dirstats dir parameter requires a directory argument starting with a '/'.");
936 return false;
937 }
938 }
939 else
940 {
941 m_log.Emsg("Config", "Error: dirstats stanza contains unknown directive '", p, "'");
942 return false;
943 }
944 }
945 }
946 else if ( part == "blocksize" )
947 {
948 long long minBSize = 4 * 1024;
949 long long maxBSize = 512 * 1024 * 1024;
950 if (XrdOuca2x::a2sz(m_log, "Error reading block-size", cwg.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
951 {
952 return false;
953 }
954 if (m_configuration.m_bufferSize & 0xFFF)
955 {
956 m_configuration.m_bufferSize &= ~0x0FFF;
957 m_configuration.m_bufferSize += 0x1000;
958 m_log.Emsg("Config", "pfc.blocksize must be a multiple of 4 kB. Rounded up.");
959 }
960 }
961 else if ( part == "prefetch" || part == "nramprefetch" )
962 {
963 if (part == "nramprefetch")
964 {
965 m_log.Emsg("Config", "pfc.nramprefetch is deprecated, please use pfc.prefetch instead. Replacing the directive internally.");
966 }
967
968 if (XrdOuca2x::a2i(m_log, "Error setting prefetch block count", cwg.GetWord(), &m_configuration.m_prefetch_max_blocks, 0, 128))
969 {
970 return false;
971 }
972
973 }
974 else if ( part == "nramread" )
975 {
976 m_log.Emsg("Config", "pfc.nramread is deprecated, please use pfc.ram instead. Ignoring this directive.");
977 cwg.GetWord(); // Ignoring argument.
978 }
979 else if ( part == "ram" )
980 {
981 long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
982 long long maxRAM = 256 * minRAM;
983 if ( XrdOuca2x::a2sz(m_log, "get RAM available", cwg.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
984 {
985 return false;
986 }
987 }
988 else if ( part == "writequeue")
989 {
990 if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-blocks", cwg.GetWord(), &m_configuration.m_wqueue_blocks, 1, 1024))
991 {
992 return false;
993 }
994 if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-threads", cwg.GetWord(), &m_configuration.m_wqueue_threads, 1, 64))
995 {
996 return false;
997 }
998 }
999 else if ( part == "spaces" )
1000 {
1001 m_configuration.m_data_space = cwg.GetWord();
1002 m_configuration.m_meta_space = cwg.GetWord();
1003 if ( ! cwg.HasLast())
1004 {
1005 m_log.Emsg("Config", "spacenames requires two parameters: <data-space> <metadata-space>.");
1006 return false;
1007 }
1008 }
1009 else if ( part == "hdfsmode" )
1010 {
1011 m_log.Emsg("Config", "pfc.hdfsmode is currently unsupported.");
1012 return false;
1013
1014 m_configuration.m_hdfsmode = true;
1015
1016 const char* params = cwg.GetWord();
1017 if (params)
1018 {
1019 if (! strncmp("hdfsbsize", params, 9))
1020 {
1021 long long minBlSize = 32 * 1024;
1022 long long maxBlSize = 128 * 1024 * 1024;
1023 if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", cwg.GetWord(), &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
1024 {
1025 return false;
1026 }
1027 }
1028 else
1029 {
1030 m_log.Emsg("Config", "Error setting the fragment size parameter name");
1031 return false;
1032 }
1033 }
1034 }
1035 else if ( part == "writemode" )
1036 {
1037 tmpc.m_writemodeRaw = cwg.GetWord();
1038 if ( ! cwg.HasLast())
1039 {
1040 m_log.Emsg("Config", "Error: pfc.writemode requires a parameter.");
1041 return false;
1042 }
1043 }
1044 else if ( part == "flush" )
1045 {
1046 tmpc.m_flushRaw = cwg.GetWord();
1047 if ( ! cwg.HasLast())
1048 {
1049 m_log.Emsg("Config", "Error: pfc.flush requires a parameter.");
1050 return false;
1051 }
1052 }
1053 else if ( part == "onlyifcached" )
1054 {
1055 const char *p = 0;
1056 while ((p = cwg.GetWord()) && cwg.HasLast())
1057 {
1058 if (strcmp(p, "minsize") == 0)
1059 {
1060 std::string minBytes = cwg.GetWord();
1061 long long minBytesTop = 1024 * 1024 * 1024;
1062 if (::isalpha(*(minBytes.rbegin())))
1063 {
1064 if (XrdOuca2x::a2sz(m_log, "Error in parsing minsize value for onlyifcached parameter", minBytes.c_str(), &m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1065 {
1066 return false;
1067 }
1068 }
1069 else
1070 {
1071 if (XrdOuca2x::a2ll(m_log, "Error in parsing numeric minsize value for onlyifcached parameter", minBytes.c_str(),&m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
1072 {
1073 return false;
1074 }
1075 }
1076 }
1077 if (strcmp(p, "minfrac") == 0)
1078 {
1079 std::string minFrac = cwg.GetWord();
1080 char *eP;
1081 errno = 0;
1082 double frac = strtod(minFrac.c_str(), &eP);
1083 if (errno || eP == minFrac.c_str())
1084 {
1085 m_log.Emsg("Config", "Error setting fraction for only-if-cached directive");
1086 return false;
1087 }
1088 m_configuration.m_onlyIfCachedMinFrac = frac;
1089 }
1090 else
1091 {
1092 m_log.Emsg("Config", "Error: onlyifcached stanza contains unknown directive", p);
1093 }
1094 }
1095 }
1096 else
1097 {
1098 m_log.Emsg("ConfigParameters() unmatched pfc parameter", part.c_str());
1099 return false;
1100 }
1101
1102 return true;
1103}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
XrdVERSIONINFO(XrdOucGetCache, XrdPfc)
XrdSysXAttr * XrdSysXAttrActive
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define open
Definition XrdPosix.hh:76
int isNo(int dflt, const char *Msg1, const char *Msg2, const char *Msg3)
#define TRACE(act, x)
Definition XrdTrace.hh:63
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
long long Total
Definition XrdOssVS.hh:90
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:188
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void * Resolve(const char *symbl, int mcnt=1)
void Unload(bool dodel=false)
char * GetWord(int lowcase=0)
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
static int a2tm(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:288
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual bool ConfigDecision(const char *params)
static size_t s_maxNumAccess
virtual bool ConfigPurgePin(const char *params)
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
const char * trace_what_strings[]
@ CSChk_Cache
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:116
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:110
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:117
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:101
int m_wqueue_threads
number of threads writing blocks to disk
Definition XrdPfc.hh:113
bool m_write_through
flag indicating write-through mode is enabled
Definition XrdPfc.hh:84
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:92
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:97
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:95
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:107
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:86
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:94
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:114
bool m_cs_ChkTLS
Allow TLS.
Definition XrdPfc.hh:121
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:96
int m_cs_Chk
Checksum check.
Definition XrdPfc.hh:120
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:100
bool m_hdfsmode
flag for enabling block-level operation
Definition XrdPfc.hh:85
int m_purgeColdFilesAge
purge files older than this age
Definition XrdPfc.hh:99
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:89
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:93
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:111
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:109
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition XrdPfc.hh:105
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:90
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:112
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:124
time_t m_cs_UVKeep
unverified checksum cache keep
Definition XrdPfc.hh:119
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition XrdPfc.hh:106
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:98
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:123
std::string m_writemodeRaw
Definition XrdPfc.hh:137
std::string m_diskUsageLWM
Definition XrdPfc.hh:131
std::string m_diskUsageHWM
Definition XrdPfc.hh:132
std::string m_fileUsageBaseline
Definition XrdPfc.hh:133
std::string m_fileUsageNominal
Definition XrdPfc.hh:134
std::string m_flushRaw
Definition XrdPfc.hh:136
std::string m_fileUsageMax
Definition XrdPfc.hh:135