XRootD
XrdHttpTpcTPC.cc
Go to the documentation of this file.
2 #include "XrdNet/XrdNetAddr.hh"
3 #include "XrdNet/XrdNetUtils.hh"
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysFD.hh"
9 #include "XrdVersion.hh"
10 
12 #include "XrdOuc/XrdOucTUtils.hh"
14 
15 #include <curl/curl.h>
16 
17 #include <dlfcn.h>
18 #include <fcntl.h>
19 
20 #include <algorithm>
21 #include <memory>
22 #include <sstream>
23 #include <stdexcept>
24 #include <thread>
25 
26 #include "XrdHttpTpcState.hh"
27 #include "XrdHttpTpcStream.hh"
28 #include "XrdHttpTpcTPC.hh"
29 #include <fstream>
30 
31 using namespace TPC;
32 
33 XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
34 
35 uint64_t TPCHandler::m_monid{0};
36 int TPCHandler::m_marker_period = 5;
37 size_t TPCHandler::m_block_size = 16*1024*1024;
38 size_t TPCHandler::m_small_block_size = 1*1024*1024;
39 XrdSysMutex TPCHandler::m_monid_mutex;
40 
42 
43 /******************************************************************************/
44 /* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
45 /******************************************************************************/
46 
47 TPCHandler::TPCLogRecord::~TPCLogRecord()
48 {
49 // Record monitoring data is enabled
50 //
51  if (tpcMonitor)
52  {XrdXrootdTpcMon::TpcInfo monInfo;
53 
54  monInfo.clID = clID.c_str();
55  monInfo.begT = begT;
56  gettimeofday(&monInfo.endT, 0);
57 
58  if (mTpcType == TpcType::Pull)
59  {monInfo.dstURL = local.c_str();
60  monInfo.srcURL = remote.c_str();
61  } else {
62  monInfo.dstURL = remote.c_str();
63  monInfo.srcURL = local.c_str();
65  }
66 
67  if (!status) monInfo.endRC = 0;
68  else if (tpc_status > 0) monInfo.endRC = tpc_status;
69  else monInfo.endRC = 1;
70  monInfo.strm = static_cast<unsigned char>(streams);
71  monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
72  if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
73 
74  tpcMonitor->Report(monInfo);
75  }
76 }
77 
78 /******************************************************************************/
79 /* C u r l D e l e t e r : : o p e r a t o r ( ) */
80 /******************************************************************************/
81 
83 {
84  if (curl) curl_easy_cleanup(curl);
85 }
86 
87 
88 /******************************************************************************/
89 /* p r e p a r e U R L */
90 /******************************************************************************/
91 
92 // See XrdHttpTpcUtils::prepareOpenURL() documentation
93 std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
94  return XrdHttpTpcUtils::prepareOpenURL(req.resource, req.headers,hdr2cgimap);
95 }
96 
97 /******************************************************************************/
98 /* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
99 /******************************************************************************/
100 
101 // When processing a redirection from the filesystem layer, it is permitted to return
102 // some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
103 // more permissive than a URI (basically, only '&' and '=' are disallowed while some
104 // URI parsers may dislike characters like '"'). This function takes an opaque string
105 // (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
106 std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
107 {
108  std::stringstream parser(opaque);
109  std::string sequence;
110  std::stringstream output;
111  bool first = true;
112  while (getline(parser, sequence, '&')) {
113  if (sequence.empty()) {continue;}
114  size_t equal_pos = sequence.find('=');
115  char *val = NULL;
116  if (equal_pos != std::string::npos)
117  val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
118  // Do not emit parameter if value exists and escaping failed.
119  if (!val && equal_pos != std::string::npos) {continue;}
120 
121  if (!first) output << "&";
122  first = false;
123  output << sequence.substr(0, equal_pos);
124  if (val) {
125  output << "=" << val;
126  curl_free(val);
127  }
128  }
129  return output.str();
130 }
131 
132 /******************************************************************************/
133 /* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
134 /******************************************************************************/
135 
136 void
137 TPCHandler::ConfigureCurlCA(CURL *curl)
138 {
139  auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
140  auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
141  if (!ca_filename.empty() && !crl_filename.empty()) {
142  curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
143  //Check that the CRL file contains at least one entry before setting this option to curl
144  //Indeed, an empty CRL file will make curl unhappy and therefore will fail
145  //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
146  std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
147  if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
148  curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
149  } else {
150  std::ostringstream oss;
151  oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
152  m_log.Log(Warning,"TpcHandler",oss.str().c_str());
153  }
154  }
155  else if (!m_cadir.empty()) {
156  curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
157  }
158  if (!m_cafile.empty()) {
159  curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
160  }
161 }
162 
163 
164 bool TPCHandler::MatchesPath(const char *verb, const char *path) {
165  return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
166 }
167 
168 /******************************************************************************/
169 /* P r e p a r e U R L */
170 /******************************************************************************/
171 
172 static std::string PrepareURL(const std::string &input) {
173  if (!strncmp(input.c_str(), "davs://", 7)) {
174  return "https://" + input.substr(7);
175  }
176  return input;
177 }
178 
179 /******************************************************************************/
180 /* T P C H a n d l e r : : P r o c e s s R e q */
181 /******************************************************************************/
182 
184  if (req.verb == "OPTIONS") {
185  return ProcessOptionsReq(req);
186  }
187  auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
188  if (header != req.headers.end()) {
189  if (header->second != "none") {
190  m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
191  return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
192  }
193  }
194  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
195  if (header != req.headers.end()) {
196  std::string src = PrepareURL(header->second);
197  return ProcessPullReq(src, req);
198  }
199  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
200  if (header != req.headers.end()) {
201  return ProcessPushReq(header->second, req);
202  }
203  m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
204  return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
205 }
206 
207 /******************************************************************************/
208 /* T P C H a n d l e r D e s t r u c t o r */
209 /******************************************************************************/
210 
212  m_sfs = NULL;
213 }
214 
215 /******************************************************************************/
216 /* T P C H a n d l e r C o n s t r u c t o r */
217 /******************************************************************************/
218 
219 TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
220  m_desthttps(false),
221  m_fixed_route(false),
222  m_timeout(60),
223  m_first_timeout(120),
224  m_log(log->logger(), "TPC_"),
225  m_sfs(NULL),
226  m_request_manager(*myEnv, *log)
227 {
228  if (!Configure(config, myEnv)) {
229  throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
230  }
231 
232 // Extract out the TPC monitoring object (we share it with xrootd).
233 //
234  XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
235  if (gs)
236  TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
237 }
238 
239 /******************************************************************************/
240 /* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
241 /******************************************************************************/
242 
246 int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
247  return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
248 }
249 
250 /******************************************************************************/
251 /* T P C H a n d l e r : : G e t A u t h z */
252 /******************************************************************************/
253 
254 std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
255  std::string authz;
256  auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
257  if (authz_header != req.headers.end()) {
258  std::stringstream ss;
259  ss << "authz=" << encode_str(authz_header->second);
260  authz += ss.str();
261  }
262  return authz;
263 }
264 
265 /******************************************************************************/
266 /* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
267 /******************************************************************************/
268 
269 int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
270  XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
271 {
272  int port;
273  const char *ptr = error.getErrText(port);
274  if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
275  rec.status = 500;
276  std::stringstream ss;
277  ss << "Internal error: redirect without hostname";
278  logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
279  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
280  }
281 
282  // Construct redirection URL taking into consideration any opaque info
283  std::string rdr_info = ptr;
284  std::string host, opaque;
285  size_t pos = rdr_info.find('?');
286  host = rdr_info.substr(0, pos);
287 
288  if (pos != std::string::npos) {
289  opaque = rdr_info.substr(pos + 1);
290  }
291 
292  std::stringstream ss;
293  ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
294 
295  if (!opaque.empty()) {
296  ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
297  }
298 
299  rec.status = 307;
300  logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
301  return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
302  NULL, 0);
303 }
304 
305 /******************************************************************************/
306 /* T P C H a n d l e r : : O p e n W a i t S t a l l */
307 /******************************************************************************/
308 
309 int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
310  int mode, int openMode, const XrdSecEntity &sec,
311  const std::string &authz)
312 {
313  int open_result;
314  while (1) {
315  int orig_ucap = fh.error.getUCap();
316  fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
317  std::string opaque;
318  size_t pos = resource.find('?');
319  // Extract the path and opaque info from the resource
320  std::string path = resource.substr(0, pos);
321 
322  if (pos != std::string::npos) {
323  opaque = resource.substr(pos + 1);
324  }
325 
326  // Append the authz information if there are some
327  if(!authz.empty()) {
328  opaque += (opaque.empty() ? "" : "&");
329  opaque += authz;
330  }
331  open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
332 
333  if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
334  int secs_to_stall = fh.error.getErrInfo();
335  if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
336  std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
337  }
338  break;
339  }
340  return open_result;
341 }
342 
343 /******************************************************************************/
344 /* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
345 /******************************************************************************/
346 
347 
348 
352 int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
353  bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
354  success = false;
355  curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
356  // Set a custom timeout of 60 seconds (= CONNECT_TIMEOUT for convenience) for the HEAD request
357  curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
358  CURLcode res;
359  res = curl_easy_perform(curl);
360  //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
361  //don't want the next curl call to do be a HEAD request
362  curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
363  // Reset the CURLOPT_TIMEOUT to no timeout (default)
364  curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
365  if (res == CURLE_HTTP_RETURNED_ERROR) {
366  std::stringstream ss;
367  ss << "Remote server failed request while fetching remote size";
368  std::stringstream ss2;
369  ss2 << ss.str() << ": " << curl_easy_strerror(res);
370  rec.status = 500;
371  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
372  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
373  } else if (state.GetStatusCode() >= 400) {
374  std::stringstream ss;
375  ss << "Remote side " << req.clienthost << " failed with status code " << state.GetStatusCode() << " while fetching remote size";
376  rec.status = 500;
377  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
378  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0) : -1;
379  } else if (res) {
380  std::stringstream ss;
381  ss << "Internal transfer failure while fetching remote size";
382  std::stringstream ss2;
383  ss2 << ss.str() << " - HTTP library failed: " << curl_easy_strerror(res);
384  rec.status = 500;
385  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss2.str());
386  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
387  }
388  std::stringstream ss;
389  ss << "Successfully determined remote size for pull request: "
390  << state.GetContentLength();
391  logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str());
392  success = true;
393  return 0;
394 }
395 
396 int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
397  State state(curl,req.tpcForwardCreds);
398  //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
399  //it will fail
400  state.SetupHeaders(req);
401  int result;
402  //In case we cannot get the content length, we return the error to the client
403  if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
404  return result;
405  }
406  contentLength = state.GetContentLength();
407  return result;
408 }
409 
410 /******************************************************************************/
411 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
412 /******************************************************************************/
413 
414 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state, std::string desc) {
415  std::stringstream ss;
416  const std::string crlf = "\n";
417  ss << "Perf Marker" << crlf;
418  ss << "Timestamp: " << time(NULL) << crlf;
419  ss << "Stripe Index: 0" << crlf;
420  ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
421  ss << "Total Stripe Count: 1" << crlf;
422  if (!desc.empty()) ss << "RemoteConnections: " << desc << crlf;
423  ss << "End" << crlf;
424  rec.bytes_transferred = state.BytesTransferred();
425  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
426  return req.ChunkResp(ss.str().c_str(), 0);
427 }
428 
429 /******************************************************************************/
430 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
431 /******************************************************************************/
432 
433 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
434  std::stringstream ss;
435  const std::string crlf = "\n";
436  ss << "Perf Marker" << crlf;
437  ss << "Timestamp: " << time(NULL) << crlf;
438  ss << "Stripe Index: 0" << crlf;
439  ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
440  ss << "Total Stripe Count: 1" << crlf;
441  // Include the TCP connection associated with this transfer; used by
442  // the TPC client for monitoring purposes.
443  std::string desc = state.GetConnectionDescription();
444  if (!desc.empty())
445  ss << "RemoteConnections: " << desc << crlf;
446  ss << "End" << crlf;
447  rec.bytes_transferred = state.BytesTransferred();
448  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
449 
450  return req.ChunkResp(ss.str().c_str(), 0);
451 }
452 
453 /******************************************************************************/
454 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
455 /******************************************************************************/
456 
457 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
458  off_t bytes_transferred)
459 {
460  // The 'performance marker' format is largely derived from how GridFTP works
461  // (e.g., the concept of `Stripe` is not quite so relevant here). See:
462  // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
463  // Example marker:
464  // Perf Marker\n
465  // Timestamp: 1537788010\n
466  // Stripe Index: 0\n
467  // Stripe Bytes Transferred: 238745\n
468  // Total Stripe Count: 1\n
469  // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
470  // End\n
471  //
472  std::stringstream ss;
473  const std::string crlf = "\n";
474  ss << "Perf Marker" << crlf;
475  ss << "Timestamp: " << time(NULL) << crlf;
476  ss << "Stripe Index: 0" << crlf;
477  ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
478  ss << "Total Stripe Count: 1" << crlf;
479  // Build a list of TCP connections associated with this transfer; used by
480  // the TPC client for monitoring purposes.
481  bool first = true;
482  std::stringstream ss2;
483  for (std::vector<State*>::const_iterator iter = state.begin();
484  iter != state.end(); iter++)
485  {
486  std::string desc = (*iter)->GetConnectionDescription();
487  if (!desc.empty()) {
488  ss2 << (first ? "" : ",") << desc;
489  first = false;
490  }
491  }
492  if (!first)
493  ss << "RemoteConnections: " << ss2.str() << crlf;
494  ss << "End" << crlf;
495  rec.bytes_transferred = bytes_transferred;
496  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
497 
498  return req.ChunkResp(ss.str().c_str(), 0);
499 }
500 
501 /******************************************************************************/
502 /* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
503 /******************************************************************************/
504 
505 int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state, TPCLogRecord &rec) {
506 
507  std::string request_label = TPCRequestManager::TPCRequest::GenerateIdentifier("tpc", req.GetSecEntity().vorg, req.mSciTag);
508  TPCRequestManager::TPCRequest request(request_label, req.mSciTag, curl);
509 
510  if (!m_request_manager.Produce(request)) {
511  int retval = req.StartChunkedResp(429, "Too Many Requests",
512  "Unable to accept HTTP-TPC requests "
513  "because server is too busy. Try again later");
514  if (retval) {
515  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL", "Failed to send the initial response to the TPC client");
516  return retval;
517  }
518  return -1;
519  }
520 
521  // curl_multi_perform is independently called in the worker thread
522  // we can however initiate a cancel here
523  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
524  if (retval) {
525  request.Cancel();
526  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
527  "Failed to send the initial response to the TPC client");
528  } else {
529  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
530  "Initial transfer response sent to the TPC client");
531  }
532 
533  // Track how long it's been since the last time we recorded more bytes being transferred.
534  off_t last_advance_bytes = 0;
535  time_t last_advance_time = time(NULL);
536  time_t transfer_start = last_advance_time;
537  CURLcode res = static_cast<CURLcode>(-1);
538 
539  // The transfer will start after this point, notify the packet marking
540  // manager
541 
542  while ((res = (CURLcode)request.WaitFor(std::chrono::seconds(m_marker_period))) < 0) {
543  auto now = time(NULL);
544  std::string conn_desc = request.GetRemoteConnDesc();
545  off_t bytes_xfer = state.BytesTransferred();
546  if (bytes_xfer > last_advance_bytes) {
547  last_advance_bytes = bytes_xfer;
548  last_advance_time = now;
549  }
550  if (SendPerfMarker(req, rec, state, conn_desc)) {
551  request.Cancel();
552  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL", "Failed to send a perf marker to the TPC client");
553  }
554  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
555  if (now > last_advance_time + timeout) {
556  const char *log_prefix = rec.log_prefix.c_str();
557  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
558  request.Cancel();
559  state.SetErrorCode(10);
560  std::stringstream ss;
561  ss << "Transfer failed because no bytes have been "
562  << (tpc_pull ? "received from the source (pull mode) in " : "transmitted to the destination (push mode) in ") << timeout
563  << " seconds.";
564  state.SetErrorMessage(ss.str());
565  }
566  }
567 
568  state.Flush();
569 
570  rec.bytes_transferred = state.BytesTransferred();
571  rec.tpc_status = state.GetStatusCode();
572 
573  // Explicitly finalize the stream (which will close the underlying file
574  // handle) before the response is sent. In some cases, subsequent HTTP
575  // requests can occur before the filesystem is done closing the handle -
576  // and those requests may occur against partial data.
577  state.Finalize();
578 
579  // Generate the final response back to the client.
580  std::stringstream ss;
581  bool success = false;
582  if (state.GetStatusCode() >= 400) {
583  std::string err = state.GetErrorMessage();
584  std::stringstream ss2;
585  ss2 << "Remote side failed with status code " << state.GetStatusCode();
586  if (!err.empty()) {
587  std::replace(err.begin(), err.end(), '\n', ' ');
588  ss2 << "; error message: \"" << err << "\"";
589  }
590  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
591  ss << generateClientErr(ss2, rec);
592  } else if (state.GetErrorCode()) {
593  std::string err = state.GetErrorMessage();
594  if (err.empty()) {err = "(no error message provided)";}
595  else {std::replace(err.begin(), err.end(), '\n', ' ');}
596  std::stringstream ss2;
597  ss2 << "Error when interacting with local filesystem: " << err;
598  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
599  ss << generateClientErr(ss2, rec);
600  } else if (res != CURLE_OK) {
601  std::stringstream ss2;
602  ss2 << "Internal transfer failure";
603  std::stringstream ss3;
604  ss3 << ss2.str() << ": " << curl_easy_strerror(res);
605  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
606  ss << generateClientErr(ss2, rec, res);
607  } else {
608  ss << "success: Created";
609  success = true;
610  }
611 
612  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
613  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
614  "Failed to send last update to remote client");
615  return retval;
616  } else if (success) {
617  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
618  rec.status = 0;
619  }
620  return req.ChunkResp(NULL, 0);
621 }
622 
623 /******************************************************************************/
624 /* T P C H a n d l e r : : P r o c e s s P u s h R e q */
625 /******************************************************************************/
626 
627 int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
628  TPCLogRecord rec(req, TpcType::Push);
629  rec.log_prefix = "PushRequest";
630  rec.local = req.resource;
631  rec.remote = resource;
632  rec.m_log = &m_log;
633  char *name = req.GetSecEntity().name;
634  req.GetClientID(rec.clID);
635  if (name) rec.name = name;
636  logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
637 
638  ManagedCurlHandle curlPtr(curl_easy_init());
639  auto curl = curlPtr.get();
640  if (!curl) {
641  std::stringstream ss;
642  ss << "Failed to initialize internal transfer resources";
643  rec.status = 500;
644  logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
645  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
646  }
647  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
648  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
649  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
650  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
651  std::string redirect_resource = req.resource;
652  if (query_header != req.headers.end()) {
653  redirect_resource = query_header->second;
654  }
655 
656  AtomicBeg(m_monid_mutex);
657  uint64_t file_monid = AtomicInc(m_monid);
658  AtomicEnd(m_monid_mutex);
659  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
660  if (!fh.get()) {
661  rec.status = 500;
662  std::stringstream ss;
663  ss << "Failed to initialize internal transfer file handle";
664  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
665  ss.str());
666  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
667  }
668  std::string full_url = prepareURL(req);
669 
670  std::string authz = GetAuthz(req);
671 
672  int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
673  req.GetSecEntity(), authz);
674  if (SFS_REDIRECT == open_results) {
675  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
676  return result;
677  } else if (SFS_OK != open_results) {
678  int code;
679  std::stringstream ss;
680  const char *msg = fh->error.getErrText(code);
681  if (msg == NULL) ss << "Failed to open local resource";
682  else ss << msg;
683  rec.status = 400;
684  if (code == EACCES) rec.status = 401;
685  else if (code == EEXIST) rec.status = 412;
686  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
687  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
688  fh->close();
689  return resp_result;
690  }
691  ConfigureCurlCA(curl);
692  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
693 
694  Stream stream(std::move(fh), 0, 0, m_log);
695  State state(0, stream, curl, true, req.tpcForwardCreds);
696  state.SetupHeaders(req);
697 
698  return RunCurlWithUpdates(curl, req, state, rec);
699 }
700 
701 /******************************************************************************/
702 /* T P C H a n d l e r : : P r o c e s s P u l l R e q */
703 /******************************************************************************/
704 
705 int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
706  TPCLogRecord rec(req,TpcType::Pull);
707  rec.log_prefix = "PullRequest";
708  rec.local = req.resource;
709  rec.remote = resource;
710  rec.m_log = &m_log;
711  char *name = req.GetSecEntity().name;
712  req.GetClientID(rec.clID);
713  if (name) rec.name = name;
714  logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
715 
716  ManagedCurlHandle curlPtr(curl_easy_init());
717  auto curl = curlPtr.get();
718  if (!curl) {
719  std::stringstream ss;
720  ss << "Failed to initialize internal transfer resources";
721  rec.status = 500;
722  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
723  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
724  }
725  // ddavila 2023-01-05:
726  // The following change was required by the Rucio/SENSE project where
727  // multiple IP addresses, each from a different subnet, are assigned to a
728  // single server and routed differently by SENSE.
729  // The above requires the server to utilize the same IP, that was used to
730  // start the TPC, for the resolution of the given TPC instead of
731  // using any of the IPs available.
732  if (m_fixed_route){
733  XrdNetAddr *nP;
734  int numIP = 0;
735  char buff[1024];
736  char * ip;
737 
738  // Get the hostname used to contact the server from the http header
739  auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
740  std::string host_used;
741  if (host_header != req.headers.end()) {
742  host_used = host_header->second;
743  }
744 
745  // Get the IP addresses associated with the above hostname
746  XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
747  int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
748  ip = (char *)malloc(ip_size-1);
749 
750  // Substring to get only the address, remove brackets and garbage
751  memcpy(ip, buff+1, ip_size-2);
752  ip[ip_size-2]='\0';
753  logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
754 
755  curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
756  }
757  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
758  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
759  curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
760  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
761  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
762  if (!fh.get()) {
763  std::stringstream ss;
764  ss << "Failed to initialize internal transfer file handle";
765  rec.status = 500;
766  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
767  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
768  }
769  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
770  std::string redirect_resource = req.resource;
771  if (query_header != req.headers.end()) {
772  redirect_resource = query_header->second;
773  }
775  auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
776  if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
777  if (! usingEC) mode = SFS_O_TRUNC;
778  }
779  int streams = 1;
780  {
781  auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
782  if (streams_header != req.headers.end()) {
783  int stream_req = -1;
784  try {
785  stream_req = std::stol(streams_header->second);
786  } catch (...) { // Handled below
787  }
788  if (stream_req < 0 || stream_req > 100) {
789  std::stringstream ss;
790  ss << "Invalid request for number of streams";
791  rec.status = 400;
792  logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
793  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
794  }
795  streams = stream_req == 0 ? 1 : stream_req;
796  }
797  }
798  rec.streams = streams;
799  std::string full_url = prepareURL(req);
800  std::string authz = GetAuthz(req);
801  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
802  ConfigureCurlCA(curl);
803  uint64_t sourceFileContentLength = 0;
804  {
805  //Get the content-length of the source file and pass it to the OSS layer
806  //during the open
807  bool success;
808  GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
809  if(success) {
810  //In the case we cannot get the information from the source server (offline or other error)
811  //we just don't add the size information to the opaque of the local file to open
812  full_url += "&oss.asize=" + std::to_string(sourceFileContentLength);
813  } else {
814  // In the case the GetContentLength is not successful, an error will be returned to the client
815  // just exit here so we don't open the file!
816  return 0;
817  }
818  }
819  int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
820  0644 | SFS_O_MKPTH,
821  req.GetSecEntity(), authz);
822  if (SFS_REDIRECT == open_result) {
823  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
824  return result;
825  } else if (SFS_OK != open_result) {
826  int code;
827  std::stringstream ss;
828  const char *msg = fh->error.getErrText(code);
829  if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
830  else ss << msg;
831  rec.status = 400;
832  if (code == EACCES) rec.status = 401;
833  else if (code == EEXIST) rec.status = 412;
834  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
835  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
836  generateClientErr(ss, rec).c_str(), 0);
837  fh->close();
838  return resp_result;
839  }
840  Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
841  State state(0, stream, curl, false, req.tpcForwardCreds);
842  state.SetupHeaders(req);
843  state.SetContentLength(sourceFileContentLength);
844 
845  if (streams > 1) {
846  return RunCurlWithStreams(req, state, streams, rec);
847  } else {
848  return RunCurlWithUpdates(curl, req, state, rec);
849  }
850 }
851 
852 /******************************************************************************/
853 /* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
854 /******************************************************************************/
855 
856 void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
857  const std::string &event, const std::string &message)
858 {
859  if (!(m_log.getMsgMask() & mask)) {return;}
860 
861  std::stringstream ss;
862  ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
863  if (rec.name.empty())
864  ss << ", user=(anonymous)";
865  else
866  ss << ", user=" << rec.name;
867  if (rec.streams != 1)
868  ss << ", streams=" << rec.streams;
869  if (rec.bytes_transferred >= 0)
870  ss << ", bytes_transferred=" << rec.bytes_transferred;
871  if (rec.status >= 0)
872  ss << ", status=" << rec.status;
873  if (rec.tpc_status >= 0)
874  ss << ", tpc_status=" << rec.tpc_status;
875  if (!message.empty())
876  ss << "; " << message;
877  m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
878 }
879 
880 std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
881  std::stringstream ssret;
882  ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
883  if(cCode != CURLcode::CURLE_OK) {
884  ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
885  }
886  return ssret.str();
887 }
888 /******************************************************************************/
889 /* X r d H t t p G e t E x t H a n d l e r */
890 /******************************************************************************/
891 
892 extern "C" {
893 
894 XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
895  if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
896  log->Emsg("TPCInitialize", "libcurl failed to initialize");
897  return NULL;
898  }
899 
900  TPCHandler *retval{NULL};
901  if (!config) {
902  log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
903  return NULL;
904  }
905  try {
906  log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
907  retval = new TPCHandler(log, config, myEnv);
908  } catch (std::runtime_error &re) {
909  log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
910  //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
911  }
912  return retval;
913 }
914 
915 }
void CURL
static std::string PrepareURL(const std::string &input)
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
std::string encode_str(const std::string &str)
bool Debug
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
@ Error
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual ~TPCHandler()
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
static std::string GenerateIdentifier(const std::string &label, const char *vorg, const int scitag)
bool Produce(TPCRequest &handler)
std::string clienthost
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
std::string verb
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap)
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
Definition: XrdNetUtils.cc:239
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
Definition: XrdOucTUtils.hh:79
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
int getMsgMask()
Definition: XrdSysError.hh:156
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
@ Warning
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info
static const int isaPush