XRootD
Loading...
Searching...
No Matches
XrdOssThrottleFile.cc
Go to the documentation of this file.
1/***************************************************************
2 *
3 * Copyright (C) 2025, Pelican Project, Morgridge Institute for Research
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License"); you
6 * may not use this file except in compliance with the License. You may
7 * obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ***************************************************************/
18
19#include "XrdOuc/XrdOucEnv.hh"
21#include "XrdOss/XrdOss.hh"
23#include "XrdSfs/XrdSfsAio.hh"
28#include "XrdVersion.hh"
29
30#include <functional>
31
32namespace {
33
34class File final : public XrdOssWrapDF {
35public:
36 File(std::unique_ptr<XrdOssDF> wrapDF, XrdThrottleManager &throttle, XrdSysError *lP, XrdOucTrace *tP)
37 : XrdOssWrapDF(*wrapDF), m_log(lP), m_throttle(throttle), m_trace(tP), m_wrapped(std::move(wrapDF)) {}
38
39virtual ~File() {}
40
41virtual int Open(const char *path, int Oflag, mode_t Mode,
42 XrdOucEnv &env) override {
43
44 std::tie(m_user, m_uid) = m_throttle.GetUserInfo(env.secEnv());
45
46 std::string open_error_message;
47 if (!m_throttle.OpenFile(m_user, open_error_message)) {
48 TRACE(DEBUG, open_error_message);
49 return -EMFILE;
50 }
51
52 auto rval = wrapDF.Open(path, Oflag, Mode, env);
53
54 if (rval < 0) {
55 m_throttle.CloseFile(m_user);
56 }
57
58 return rval;
59}
60
61virtual int Close(long long *retsz) override {
62 m_throttle.CloseFile(m_user);
63 return wrapDF.Close(retsz);
64}
65
66virtual int getFD() override {return -1;}
67
68virtual off_t getMmap(void **addr) override {*addr = 0; return 0;}
69
70virtual ssize_t pgRead (void* buffer, off_t offset, size_t rdlen,
71 uint32_t* csvec, uint64_t opts) override {
72
73 return DoThrottle(rdlen, 1,
74 static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgRead),
75 buffer, offset, rdlen, csvec, opts);
76}
77
78virtual int pgRead(XrdSfsAio *aioparm, uint64_t opts) override
79{ // We disable all AIO-based reads.
80 aioparm->Result = pgRead((char *)aioparm->sfsAio.aio_buf,
81 aioparm->sfsAio.aio_offset,
82 aioparm->sfsAio.aio_nbytes,
83 aioparm->cksVec, opts);
84 aioparm->doneRead();
85 return 0;
86}
87
88virtual ssize_t pgWrite(void* buffer, off_t offset, size_t wrlen,
89 uint32_t* csvec, uint64_t opts) override {
90
91 return DoThrottle(wrlen, 1,
92 static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgWrite),
93 buffer, offset, wrlen, csvec, opts);
94}
95
96virtual int pgWrite(XrdSfsAio *aioparm, uint64_t opts) override
97{ // We disable all AIO-based writes.
98 aioparm->Result = this->pgWrite((char *)aioparm->sfsAio.aio_buf,
99 aioparm->sfsAio.aio_offset,
100 aioparm->sfsAio.aio_nbytes,
101 aioparm->cksVec, opts);
102 aioparm->doneWrite();
103 return 0;
104}
105
106virtual ssize_t Read(off_t offset, size_t size) override {
107 return DoThrottle(size, 1,
108 static_cast<ssize_t (XrdOssDF::*)(off_t, size_t)>(&XrdOssDF::Read),
109 offset, size);
110}
111virtual ssize_t Read(void* buffer, off_t offset, size_t size) override {
112 return DoThrottle(size, 1,
113 static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t)>(&XrdOssDF::Read),
114 buffer, offset, size);
115}
116
117virtual int Read(XrdSfsAio *aiop) override {
118 aiop->Result = this->Read((char *)aiop->sfsAio.aio_buf,
119 aiop->sfsAio.aio_offset,
120 aiop->sfsAio.aio_nbytes);
121 aiop->doneRead();
122 return 0;
123}
124
125virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt) override {
126 off_t sum = 0;
127 for (int i = 0; i < rdvcnt; ++i) {
128 sum += readV[i].size;
129 }
130 return DoThrottle(sum, rdvcnt, &XrdOssDF::ReadV, readV, rdvcnt);
131}
132
133
134virtual ssize_t Write(const void* buffer, off_t offset, size_t size) override {
135 return DoThrottle(size, 1,
136 static_cast<ssize_t (XrdOssDF::*)(const void*, off_t, size_t)>(&XrdOssDF::Write),
137 buffer, offset, size);
138}
139
140virtual int Write(XrdSfsAio *aiop) override {
141 aiop->Result = this->Write((char *)aiop->sfsAio.aio_buf,
142 aiop->sfsAio.aio_offset,
143 aiop->sfsAio.aio_nbytes);
144 aiop->doneWrite();
145 return 0;
146}
147
148private:
149
150 template <class Fn, class... Args>
151 int DoThrottle(size_t rdlen, size_t ops, Fn &&fn, Args &&... args) {
152 m_throttle.Apply(rdlen, ops, m_uid);
153 bool ok = true;
154 XrdThrottleTimer timer = m_throttle.StartIOTimer(m_uid, ok);
155 if (!ok) {
156 TRACE(DEBUG, "Throttling in progress");
157 return -EMFILE;
158 }
159 return std::invoke(fn, wrapDF, std::forward<Args>(args)...);
160 }
161
162 XrdSysError *m_log{nullptr};
163 XrdThrottleManager &m_throttle;
164 XrdOucTrace *m_trace{nullptr};
165 std::unique_ptr<XrdOssDF> m_wrapped;
166 std::string m_user;
167 uint16_t m_uid;
168
169 static constexpr char TraceID[] = "XrdThrottleFile";
170};
171
172class FileSystem final : public XrdOssWrapper {
173public:
174 FileSystem(XrdOss *oss, XrdSysLogger *log, XrdOucEnv *envP)
175 : XrdOssWrapper(*oss),
176 m_env(envP),
177 m_oss(oss),
178 m_log(new XrdSysError(log)),
179 m_trace(new XrdOucTrace(m_log.get())),
180 m_throttle(m_log.get(), m_trace.get())
181 {
182
183 m_throttle.Init();
184 if (envP)
185 {
186 auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
187 m_log->Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
188 m_throttle.SetMonitor(gstream);
189 }
190 }
191
192 int Configure(const std::string &config_filename) {
193 XrdThrottle::Configuration config(*m_log, m_env);
194 if (config.Configure(config_filename)) {
195 m_log->Emsg("Config", "Unable to load configuration file", config_filename.c_str());
196 return 1;
197 }
198 m_throttle.FromConfig(config);
199 return 0;
200 }
201
202 virtual ~FileSystem() {}
203
204 virtual XrdOssDF *newFile(const char *user = 0) override {
205 std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
206 return new File(std::move(wrapped), m_throttle, m_log.get(), m_trace.get());
207 }
208
209private:
210 XrdOucEnv *m_env{nullptr};
211 std::unique_ptr<XrdOss> m_oss;
212 std::unique_ptr<XrdSysError> m_log{nullptr};
213 std::unique_ptr<XrdOucTrace> m_trace{nullptr};
214 XrdThrottleManager m_throttle;
215};
216
217} // namespace
218
219extern "C" {
220
222 const char *config_fn, const char *parms,
223 XrdOucEnv *envP) {
224 std::unique_ptr<FileSystem> fs(new FileSystem(curr_oss, logger, envP));
225 if (fs->Configure(config_fn)) {
226 XrdSysError(logger, "XrdThrottle").Say("Config", "Unable to load configuration file", config_fn);
227 return nullptr;
228 }
229 // Note the throttle is set up as an OSS.
230 // This will prevent the throttle from being layered on top of the OFS; to keep backward
231 // compatibility with old configurations, we do not cause the server to fail.
232 //
233 // Originally, XrdThrottle was used as an OFS because the loadshed code required the ability
234 // to redirect the client to a different server. This is rarely (never?) used in practice.
235 // By putting the throttle in the OSS, we benefit from the fact the OFS has first run the
236 // authorization code and has made a user name available for fairshare of the throttle.
237 envP->PutInt("XrdOssThrottle", 1);
238 return fs.release();
239}
240
242
243} // extern "C"
244
#define DEBUG(x)
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *Logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
Definition XrdOssCsi.cc:455
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
XrdVERSIONINFO(XrdOssAddStorageSystem2, throttle)
int Mode
XrdOucString File
struct myOpts opts
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:198
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:160
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
void PutInt(const char *varname, long value)
Definition XrdOucEnv.cc:268
const XrdSecEntity * secEnv() const
Definition XrdOucEnv.hh:107
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:281
uint32_t * cksVec
Definition XrdSfsAio.hh:63
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
virtual void doneWrite()=0
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
CloseImpl< false > Close(Ctx< File > file, uint16_t timeout=0)
Factory for creating CloseImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XrdOucEnv * envP
Definition XrdPss.cc:109