xrootd
XrdThrottleManager.hh
Go to the documentation of this file.
1 
2 /*
3  * XrdThrottleManager
4  *
5  * This class provides an implementation of a throttle manager.
6  * The throttled manager purposely pause if the bandwidth, IOPS
7  * rate, or number of outstanding IO requests is sustained above
8  * a certain level.
9  *
10  * The XrdThrottleManager is user-aware and provides fairshare.
11  *
12  * This works by having a separate thread periodically refilling
13  * each user's shares.
14  *
15  * Note that we do not actually keep close track of users, but rather
16  * put them into a hash. This way, we can pretend there's a constant
17  * number of users and use a lock-free algorithm.
18  */
19 
20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
22 
23 #ifdef __GNUC__
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
26 #else
27 #define likely(x) x
28 #define unlikely(x) x
29 #endif
30 
31 #include <string>
32 #include <vector>
33 #include <ctime>
34 #include <mutex>
35 #include <unordered_map>
36 #include <memory>
37 
38 #include "XrdSys/XrdSysPthread.hh"
39 
40 class XrdSysError;
41 class XrdOucTrace;
42 class XrdThrottleTimer;
43 class XrdXrootdGStream;
44 
46 {
47 
48 friend class XrdThrottleTimer;
49 
50 public:
51 
52 void Init();
53 
54 bool OpenFile(const std::string &entity, std::string &open_error_message);
55 bool CloseFile(const std::string &entity);
56 
57 void Apply(int reqsize, int reqops, int uid);
58 
59 bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
60 
61 void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
62  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
63  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
64 
65 void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
66  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
67 
68 void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
69 
70 void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
71 
72 void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
73 
74 //int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
75 
76 static
77 int GetUid(const char *username);
78 
80 
81 void PrepLoadShed(const char *opaque, std::string &lsOpaque);
82 
83 bool CheckLoadShed(const std::string &opaque);
84 
85 void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
86 
88 
89  ~XrdThrottleManager() {} // The buffmanager is never deleted
90 
91 protected:
92 
93 void StopIOTimer(struct timespec);
94 
95 private:
96 
97 void Recompute();
98 
99 void RecomputeInternal();
100 
101 static
102 void * RecomputeBootstrap(void *pp);
103 
104 int WaitForShares();
105 
106 void GetShares(int &shares, int &request);
107 
108 void StealShares(int uid, int &reqsize, int &reqops);
109 
112 
114 
115 // Controls for the various rates.
120 
121 // Maintain the shares
122 static const
124 std::vector<int> m_primary_bytes_shares;
125 std::vector<int> m_secondary_bytes_shares;
126 std::vector<int> m_primary_ops_shares;
127 std::vector<int> m_secondary_ops_shares;
129 
130 // Active IO counter
132 struct timespec m_io_wait;
133 unsigned m_io_total{0};
134 // Stable IO counters - must hold m_compute_var lock when reading/writing;
136 int m_stable_io_total{0}; // It would take ~3 years to overflow a 32-bit unsigned integer at 100Hz of IO operations.
137 struct timespec m_stable_io_wait;
138 
139 // Load shed details
140 std::string m_loadshed_host;
144 
145 // Maximum number of open files
146 unsigned long m_max_open{0};
147 unsigned long m_max_conns{0};
148 std::unordered_map<std::string, unsigned long> m_file_counters;
149 std::unordered_map<std::string, unsigned long> m_conn_counters;
150 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
151 std::mutex m_file_mutex;
152 
153 // Monitoring handle, if configured
155 
156 static const char *TraceID;
157 
158 };
159 
161 {
162 
163 friend class XrdThrottleManager;
164 
165 public:
166 
167 void StopTimer()
168 {
169  struct timespec end_timer = {0, 0};
170 #if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
171  int retval = clock_gettime(clock_id, &end_timer);
172 #else
173  int retval = -1;
174 #endif
175  if (likely(retval == 0))
176  {
177  end_timer.tv_sec -= m_timer.tv_sec;
178  end_timer.tv_nsec -= m_timer.tv_nsec;
179  if (end_timer.tv_nsec < 0)
180  {
181  end_timer.tv_sec--;
182  end_timer.tv_nsec += 1000000000;
183  }
184  }
185  if (m_timer.tv_nsec != -1)
186  {
187  m_manager.StopIOTimer(end_timer);
188  }
189  m_timer.tv_sec = 0;
190  m_timer.tv_nsec = -1;
191 }
192 
194 {
195  if (!((m_timer.tv_sec == 0) && (m_timer.tv_nsec == -1)))
196  {
197  StopTimer();
198  }
199 }
200 
201 protected:
202 
204  m_manager(manager)
205 {
206 #if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
207  int retval = clock_gettime(clock_id, &m_timer);
208 #else
209  int retval = -1;
210 #endif
211  if (unlikely(retval == -1))
212  {
213  m_timer.tv_sec = 0;
214  m_timer.tv_nsec = 0;
215  }
216 }
217 
218 private:
220 struct timespec m_timer;
221 
222 static clockid_t clock_id;
223 };
224 
225 #endif
unsigned long m_max_conns
Definition: XrdThrottleManager.hh:147
~XrdThrottleManager()
Definition: XrdThrottleManager.hh:89
int m_stable_io_active
Definition: XrdThrottleManager.hh:135
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
Definition: XrdThrottleManager.hh:45
static const int m_max_users
Definition: XrdThrottleManager.hh:123
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
static clockid_t clock_id
Definition: XrdThrottleManager.hh:222
static void * RecomputeBootstrap(void *pp)
void SetMaxOpen(unsigned long max_open)
Definition: XrdThrottleManager.hh:68
float m_ops_per_second
Definition: XrdThrottleManager.hh:118
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
float m_interval_length_seconds
Definition: XrdThrottleManager.hh:116
void StopTimer()
Definition: XrdThrottleManager.hh:167
unsigned long m_max_open
Definition: XrdThrottleManager.hh:146
std::vector< int > m_secondary_bytes_shares
Definition: XrdThrottleManager.hh:125
bool CloseFile(const std::string &entity)
Definition: XrdOucTrace.hh:35
Definition: XrdSysError.hh:89
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
Definition: XrdThrottleManager.hh:65
XrdThrottleTimer(XrdThrottleManager &manager)
Definition: XrdThrottleManager.hh:203
Definition: XrdThrottleManager.hh:160
static int GetUid(const char *username)
std::unordered_map< std::string, unsigned long > m_file_counters
Definition: XrdThrottleManager.hh:148
void GetShares(int &shares, int &request)
int m_io_active
Definition: XrdThrottleManager.hh:131
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleManager & m_manager
Definition: XrdThrottleManager.hh:219
XrdOucTrace * m_trace
Definition: XrdThrottleManager.hh:110
struct timespec m_stable_io_wait
Definition: XrdThrottleManager.hh:137
XrdSysCondVar m_compute_var
Definition: XrdThrottleManager.hh:113
Definition: XrdSysPthread.hh:78
std::vector< int > m_primary_bytes_shares
Definition: XrdThrottleManager.hh:124
bool IsThrottling()
Definition: XrdThrottleManager.hh:59
void SetMaxConns(unsigned long max_conns)
Definition: XrdThrottleManager.hh:70
struct timespec m_timer
Definition: XrdThrottleManager.hh:220
void SetMonitor(XrdXrootdGStream *gstream)
Definition: XrdThrottleManager.hh:72
std::string m_loadshed_host
Definition: XrdThrottleManager.hh:140
struct timespec m_io_wait
Definition: XrdThrottleManager.hh:132
Definition: XrdXrootdGStream.hh:43
std::mutex m_file_mutex
Definition: XrdThrottleManager.hh:151
unsigned m_loadshed_port
Definition: XrdThrottleManager.hh:141
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
Definition: XrdThrottleManager.hh:61
std::vector< int > m_primary_ops_shares
Definition: XrdThrottleManager.hh:126
void StealShares(int uid, int &reqsize, int &reqops)
~XrdThrottleTimer()
Definition: XrdThrottleManager.hh:193
std::unordered_map< std::string, std::unique_ptr< std::unordered_map< pid_t, unsigned long > > > m_active_conns
Definition: XrdThrottleManager.hh:150
int m_last_round_allocation
Definition: XrdThrottleManager.hh:128
unsigned m_io_total
Definition: XrdThrottleManager.hh:133
int m_concurrency_limit
Definition: XrdThrottleManager.hh:119
int m_stable_io_total
Definition: XrdThrottleManager.hh:136
unsigned m_loadshed_frequency
Definition: XrdThrottleManager.hh:142
std::unordered_map< std::string, unsigned long > m_conn_counters
Definition: XrdThrottleManager.hh:149
int m_loadshed_limit_hit
Definition: XrdThrottleManager.hh:143
float m_bytes_per_second
Definition: XrdThrottleManager.hh:117
#define likely(x)
Definition: XrdThrottleManager.hh:27
#define unlikely(x)
Definition: XrdThrottleManager.hh:28
XrdXrootdGStream * m_gstream
Definition: XrdThrottleManager.hh:154
std::vector< int > m_secondary_ops_shares
Definition: XrdThrottleManager.hh:127
static const char * TraceID
Definition: XrdThrottleManager.hh:156
void Apply(int reqsize, int reqops, int uid)
XrdSysError * m_log
Definition: XrdThrottleManager.hh:111
void StopIOTimer(struct timespec)
XrdThrottleTimer StartIOTimer()