XRootD
XrdClFileStateHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClStatus.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
35 #include "XrdCl/XrdClMonitor.hh"
36 #include "XrdCl/XrdClFileTimer.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdCl/XrdClAnyObject.hh"
41 #include "XrdCl/XrdClUtils.hh"
42 
43 #ifdef WITH_XRDEC
44 #include "XrdCl/XrdClEcHandler.hh"
45 #endif
46 
47 #include "XrdOuc/XrdOucCRC.hh"
49 #include "XrdOuc/XrdOucUtils.hh"
50 
52 #include "XrdSys/XrdSysPageSize.hh"
53 #include "XrdSys/XrdSysPthread.hh"
54 
55 #include <sstream>
56 #include <memory>
57 #include <numeric>
58 #include <sys/time.h>
59 #include <uuid/uuid.h>
60 #include <mutex>
61 
62 namespace
63 {
64  //----------------------------------------------------------------------------
65  // Helper callback for handling PgRead responses
66  //----------------------------------------------------------------------------
67  class PgReadHandler : public XrdCl::ResponseHandler
68  {
69  friend class PgReadRetryHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
74  // Constructor
75  //------------------------------------------------------------------------
76  PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77  XrdCl::ResponseHandler *userHandler,
78  uint64_t orgOffset ) :
79  stateHandler( stateHandler ),
80  userHandler( userHandler ),
81  orgOffset( orgOffset ),
82  maincall( true ),
83  retrycnt( 0 ),
84  nbrepair( 0 )
85  {
86  }
87 
88  //------------------------------------------------------------------------
89  // Handle the response
90  //------------------------------------------------------------------------
92  XrdCl::AnyObject *response,
93  XrdCl::HostList *hostList )
94  {
95  using namespace XrdCl;
96 
97  std::unique_lock<std::mutex> lck( mtx );
98 
99  if( !maincall )
100  {
101  //--------------------------------------------------------------------
102  // We are serving PgRead retry request
103  //--------------------------------------------------------------------
104  --retrycnt;
105  if( !status->IsOK() )
106  st.reset( status );
107  else
108  {
109  delete status; // by convention other args are null (see PgReadRetryHandler)
110  ++nbrepair; // update number of repaired pages
111  }
112 
113  if( retrycnt == 0 )
114  {
115  //------------------------------------------------------------------
116  // All retries came back
117  //------------------------------------------------------------------
118  if( st->IsOK() )
119  {
120  PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121  pginf.SetNbRepair( nbrepair );
122  userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123  }
124  else
125  userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126  lck.unlock();
127  delete this;
128  }
129 
130  return;
131  }
132 
133  //----------------------------------------------------------------------
134  // We are serving main PgRead request
135  //----------------------------------------------------------------------
136  if( !status->IsOK() )
137  {
138  //--------------------------------------------------------------------
139  // The main PgRead request has failed
140  //--------------------------------------------------------------------
141  userHandler->HandleResponseWithHosts( status, response, hostList );
142  lck.unlock();
143  delete this;
144  return;
145  }
146 
147  maincall = false;
148 
149  //----------------------------------------------------------------------
150  // Do the integrity check
151  //----------------------------------------------------------------------
152  PageInfo *pginf = 0;
153  response->Get( pginf );
154 
155  uint64_t pgoff = pginf->GetOffset();
156  uint32_t bytesRead = pginf->GetLength();
157  std::vector<uint32_t> &cksums = pginf->GetCksums();
158  char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159  size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160  uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161  if( pgsize > bytesRead ) pgsize = bytesRead;
162 
163  for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164  {
165  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166  if( crcval != cksums[pgnb] )
167  {
168  Log *log = DefaultEnv::GetLog();
169  log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170  (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171 
172  XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173  if( !st.IsOK())
174  {
175  *status = st; // the reason for this failure
176  break;
177  }
178  ++retrycnt; // update the retry counter
179  }
180 
181  bytesRead -= pgsize;
182  buffer += pgsize;
183  pgoff += pgsize;
184  pgsize = XrdSys::PageSize;
185  if( pgsize > bytesRead ) pgsize = bytesRead;
186  }
187 
188 
189  if( retrycnt == 0 )
190  {
191  //--------------------------------------------------------------------
192  // All went well!
193  //--------------------------------------------------------------------
194  userHandler->HandleResponseWithHosts( status, response, hostList );
195  lck.unlock();
196  delete this;
197  return;
198  }
199 
200  //----------------------------------------------------------------------
201  // We have to wait for retries!
202  //----------------------------------------------------------------------
203  resp.reset( response );
204  hosts.reset( hostList );
205  st.reset( status );
206  }
207 
208  void UpdateCksum( size_t pgnb, uint32_t crcval )
209  {
210  if( resp )
211  {
212  XrdCl::PageInfo *pginf = 0;
213  resp->Get( pginf );
214  pginf->GetCksums()[pgnb] = crcval;
215  }
216  }
217 
218  private:
219 
220  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221  XrdCl::ResponseHandler *userHandler;
222  uint64_t orgOffset;
223 
224  std::unique_ptr<XrdCl::AnyObject> resp;
225  std::unique_ptr<XrdCl::HostList> hosts;
226  std::unique_ptr<XrdCl::XRootDStatus> st;
227 
228  std::mutex mtx;
229  bool maincall;
230  size_t retrycnt;
231  size_t nbrepair;
232 
233  };
234 
235  //----------------------------------------------------------------------------
236  // Helper callback for handling PgRead retries
237  //----------------------------------------------------------------------------
238  class PgReadRetryHandler : public XrdCl::ResponseHandler
239  {
240  public:
241 
242  PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243  pgnb( pgnb )
244  {
245 
246  }
247 
248  //------------------------------------------------------------------------
249  // Handle the response
250  //------------------------------------------------------------------------
251  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252  XrdCl::AnyObject *response,
253  XrdCl::HostList *hostList )
254  {
255  using namespace XrdCl;
256 
257  if( !status->IsOK() )
258  {
259  Log *log = DefaultEnv::GetLog();
260  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262  pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263  delete this;
264  return;
265  }
266 
267  XrdCl::PageInfo *pginf = 0;
268  response->Get( pginf );
269  if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270  {
271  Log *log = DefaultEnv::GetLog();
272  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274  // we retry a page at a time so the length cannot exceed 4KB
275  DeleteArgs( status, response, hostList );
276  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277  delete this;
278  return;
279  }
280 
281  uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282  if( crcval != pginf->GetCksums().front() )
283  {
284  Log *log = DefaultEnv::GetLog();
285  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287  DeleteArgs( status, response, hostList );
288  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289  delete this;
290  return;
291  }
292 
293  Log *log = DefaultEnv::GetLog();
294  log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296 
297  DeleteArgs( 0, response, hostList );
298  pgReadHandler->UpdateCksum( pgnb, crcval );
299  pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300  delete this;
301  }
302 
303  private:
304 
305  inline void DeleteArgs( XrdCl::XRootDStatus *status,
306  XrdCl::AnyObject *response,
307  XrdCl::HostList *hostList )
308  {
309  delete status;
310  delete response;
311  delete hostList;
312  }
313 
314  PgReadHandler *pgReadHandler;
315  size_t pgnb;
316  };
317 
318  //----------------------------------------------------------------------------
319  // Handle PgRead substitution with ordinary Read
320  //----------------------------------------------------------------------------
321  class PgReadSubstitutionHandler : public XrdCl::ResponseHandler
322  {
323  public:
324 
325  //------------------------------------------------------------------------
326  // Constructor
327  //------------------------------------------------------------------------
328  PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329  XrdCl::ResponseHandler *userHandler ) :
330  stateHandler( stateHandler ),
331  userHandler( userHandler )
332  {
333  }
334 
335  //------------------------------------------------------------------------
336  // Handle the response
337  //------------------------------------------------------------------------
338  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339  XrdCl::AnyObject *rdresp,
340  XrdCl::HostList *hostList )
341  {
342  using namespace XrdCl;
343 
344  if( !status->IsOK() )
345  {
346  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
347  delete this;
348  return;
349  }
350 
351 
352  ChunkInfo *chunk = nullptr;
353  rdresp->Get( chunk );
354 
355  if( !chunk )
356  {
357  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
358  delete this;
359  return;
360  }
361 
362  std::vector<uint32_t> cksums;
363  if( stateHandler->pIsChannelEncrypted )
364  {
365  size_t nbpages = chunk->length / XrdSys::PageSize;
366  if( chunk->length % XrdSys::PageSize )
367  ++nbpages;
368  cksums.reserve( nbpages );
369 
370  size_t size = chunk->length;
371  char *buffer = reinterpret_cast<char*>( chunk->buffer );
372 
373  for( size_t pg = 0; pg < nbpages; ++pg )
374  {
375  size_t pgsize = XrdSys::PageSize;
376  if( pgsize > size ) pgsize = size;
377  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
378  cksums.push_back( crcval );
379  buffer += pgsize;
380  size -= pgsize;
381  }
382  }
383 
384  PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
385  chunk->buffer, std::move( cksums ) );
386  delete rdresp;
387  AnyObject *response = new AnyObject();
388  response->Set( pages );
389  userHandler->HandleResponseWithHosts( status, response, hostList );
390 
391  delete this;
392  }
393 
394  private:
395 
396  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
397  XrdCl::ResponseHandler *userHandler;
398  };
399 
400  //----------------------------------------------------------------------------
401  // Object that does things to the FileStateHandler when kXR_open returns
402  // and then calls the user handler
403  //----------------------------------------------------------------------------
404  class OpenHandler: public XrdCl::ResponseHandler
405  {
406  public:
407  //------------------------------------------------------------------------
408  // Constructor
409  //------------------------------------------------------------------------
410  OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
411  XrdCl::ResponseHandler *userHandler ):
412  pStateHandler( stateHandler ),
413  pUserHandler( userHandler )
414  {
415  }
416 
417  //------------------------------------------------------------------------
418  // Handle the response
419  //------------------------------------------------------------------------
420  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
421  XrdCl::AnyObject *response,
422  XrdCl::HostList *hostList )
423  {
424  using namespace XrdCl;
425 
426  //----------------------------------------------------------------------
427  // Extract the statistics info
428  //----------------------------------------------------------------------
429  OpenInfo *openInfo = 0;
430  if( status->IsOK() )
431  response->Get( openInfo );
432 #ifdef WITH_XRDEC
433  else
434  //--------------------------------------------------------------------
435  // Handle EC redirect
436  //--------------------------------------------------------------------
437  if( status->code == errRedirect )
438  {
439  std::string ecurl = status->GetErrorMessage();
440  EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
441  if( ecHandler && pStateHandler->NeedFileTempl() )
442  {
443  delete status;
444  status = new XRootDStatus( stError, errNotSupported, 0,
445  "File template not supported with Ec" );
446  delete ecHandler;
447  ecHandler = 0;
448  }
449  else if( ecHandler )
450  {
451  pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
452  ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
453  return;
454  }
455  }
456 #endif
457  //----------------------------------------------------------------------
458  // Notify the state handler and the client and say bye bye
459  //----------------------------------------------------------------------
460  pStateHandler->OnOpen( status, openInfo, hostList );
461  delete response;
462  if( pUserHandler )
463  pUserHandler->HandleResponseWithHosts( status, 0, hostList );
464  else
465  {
466  delete status;
467  delete hostList;
468  }
469  delete this;
470  }
471 
472  private:
473  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
474  XrdCl::ResponseHandler *pUserHandler;
475  };
476 
477  //----------------------------------------------------------------------------
478  // Object that does things to the FileStateHandler when kXR_close returns
479  // and then calls the user handler
480  //----------------------------------------------------------------------------
481  class CloseHandler: public XrdCl::ResponseHandler
482  {
483  public:
484  //------------------------------------------------------------------------
485  // Constructor
486  //------------------------------------------------------------------------
487  CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
488  XrdCl::ResponseHandler *userHandler,
489  XrdCl::Message *message ):
490  pStateHandler( stateHandler ),
491  pUserHandler( userHandler ),
492  pMessage( message )
493  {
494  }
495 
496  //------------------------------------------------------------------------
498  //------------------------------------------------------------------------
499  virtual ~CloseHandler()
500  {
501  delete pMessage;
502  }
503 
504  //------------------------------------------------------------------------
505  // Handle the response
506  //------------------------------------------------------------------------
507  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
508  XrdCl::AnyObject *response,
509  XrdCl::HostList *hostList )
510  {
511  pStateHandler->OnClose( status );
512  if( pUserHandler )
513  pUserHandler->HandleResponseWithHosts( status, response, hostList );
514  else
515  {
516  delete response;
517  delete status;
518  delete hostList;
519  }
520 
521  delete this;
522  }
523 
524  private:
525  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
526  XrdCl::ResponseHandler *pUserHandler;
527  XrdCl::Message *pMessage;
528  };
529 
530  //----------------------------------------------------------------------------
531  // Stateful message handler
532  //----------------------------------------------------------------------------
533  class StatefulHandler: public XrdCl::ResponseHandler
534  {
535  public:
536  //------------------------------------------------------------------------
537  // Constructor
538  //------------------------------------------------------------------------
539  StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
540  XrdCl::ResponseHandler *userHandler,
541  XrdCl::Message *message,
542  const XrdCl::MessageSendParams &sendParams ):
543  pStateHandler( stateHandler ),
544  pUserHandler( userHandler ),
545  pMessage( message ),
546  pSendParams( sendParams )
547  {
548  }
549 
550  //------------------------------------------------------------------------
551  // Destructor
552  //------------------------------------------------------------------------
553  virtual ~StatefulHandler()
554  {
555  delete pMessage;
556  delete pSendParams.chunkList;
557  delete pSendParams.kbuff;
558  }
559 
560  //------------------------------------------------------------------------
561  // Handle the response
562  //------------------------------------------------------------------------
563  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
564  XrdCl::AnyObject *response,
565  XrdCl::HostList *hostList )
566  {
567  using namespace XrdCl;
568  std::unique_ptr<AnyObject> responsePtr( response );
569  pSendParams.hostList = hostList;
570 
571  //----------------------------------------------------------------------
572  // Houston we have a problem...
573  //----------------------------------------------------------------------
574  if( !status->IsOK() )
575  {
576  XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
577  return;
578  }
579 
580  //----------------------------------------------------------------------
581  // We're clear
582  //----------------------------------------------------------------------
583  responsePtr.release();
584  XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
585  if( pUserHandler )
586  pUserHandler->HandleResponseWithHosts( status, response, hostList );
587  else
588  {
589  delete status,
590  delete response;
591  delete hostList;
592  }
593  delete this;
594  }
595 
596  //------------------------------------------------------------------------
598  //------------------------------------------------------------------------
599  XrdCl::ResponseHandler *GetUserHandler()
600  {
601  return pUserHandler;
602  }
603 
604  private:
605  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
606  XrdCl::ResponseHandler *pUserHandler;
607  XrdCl::Message *pMessage;
608  XrdCl::MessageSendParams pSendParams;
609  };
610 
611  //----------------------------------------------------------------------------
612  // Release-buffer Handler
613  //----------------------------------------------------------------------------
614  class ReleaseBufferHandler: public XrdCl::ResponseHandler
615  {
616  public:
617 
618  //------------------------------------------------------------------------
619  // Constructor
620  //------------------------------------------------------------------------
621  ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
622  buffer( std::move( buffer ) ),
623  handler( handler )
624  {
625  }
626 
627  //------------------------------------------------------------------------
628  // Handle the response
629  //------------------------------------------------------------------------
630  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
631  XrdCl::AnyObject *response,
632  XrdCl::HostList *hostList )
633  {
634  if (handler)
635  handler->HandleResponseWithHosts( status, response, hostList );
636  }
637 
638  //------------------------------------------------------------------------
639  // Get the underlying buffer
640  //------------------------------------------------------------------------
641  XrdCl::Buffer& GetBuffer()
642  {
643  return buffer;
644  }
645 
646  private:
647  XrdCl::Buffer buffer;
648  XrdCl::ResponseHandler *handler;
649  };
650 }
651 
652 namespace XrdCl
653 {
654  //----------------------------------------------------------------------------
655  // Constructor
656  //----------------------------------------------------------------------------
657  FileStateHandler::FileStateHandler( FilePlugIn *& plugin ):
658  pFileState( Closed ),
659  pStatInfo( 0 ),
660  pFileUrl( 0 ),
661  pDataServer( 0 ),
662  pLoadBalancer( 0 ),
663  pStateRedirect( 0 ),
664  pWrtRecoveryRedir( 0 ),
665  pFileHandle( 0 ),
666  pOpenMode( 0 ),
667  pOpenFlags( OpenFlags::None ),
668  pSessionId( 0 ),
669  pDoRecoverRead( true ),
670  pDoRecoverWrite( true ),
671  pFollowRedirects( true ),
672  pUseVirtRedirector( true ),
673  pIsChannelEncrypted( false ),
674  pAllowBundledClose( false ),
675  pPlugin( plugin )
676  {
677  pFileHandle = new uint8_t[4];
678  ResetMonitoringVars();
681  pLFileHandler = new LocalFileHandler();
682  }
683 
684  //------------------------------------------------------------------------
689  //------------------------------------------------------------------------
690  FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
691  pFileState( Closed ),
692  pStatInfo( 0 ),
693  pFileUrl( 0 ),
694  pDataServer( 0 ),
695  pLoadBalancer( 0 ),
696  pStateRedirect( 0 ),
697  pWrtRecoveryRedir( 0 ),
698  pFileHandle( 0 ),
699  pOpenMode( 0 ),
700  pOpenFlags( OpenFlags::None ),
701  pSessionId( 0 ),
702  pDoRecoverRead( true ),
703  pDoRecoverWrite( true ),
704  pFollowRedirects( true ),
705  pUseVirtRedirector( useVirtRedirector ),
706  pAllowBundledClose( false ),
707  pPlugin( plugin )
708  {
709  pFileHandle = new uint8_t[4];
710  ResetMonitoringVars();
713  pLFileHandler = new LocalFileHandler();
714  }
715 
716  //----------------------------------------------------------------------------
717  // Destructor
718  //----------------------------------------------------------------------------
720  {
721  //--------------------------------------------------------------------------
722  // This, in principle, should never ever happen. Except for the case
723  // when we're interfaced with ROOT that may call this desctructor from
724  // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
725  // has been finalized by the linker. So, if we don't have the log object
726  // at this point we just give up the hope.
727  //--------------------------------------------------------------------------
728  if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
729  DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
730 
733 
736 
737  if( pFileState != Closed && DefaultEnv::GetLog() )
738  {
739  XRootDStatus st;
740  MonitorClose( &st );
741  ResetMonitoringVars();
742  }
743 
744  // check if the logger is still there, this is only for root, as root might
745  // have unload us already so in this case we don't want to do anything
746  if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
747  {
749  registry.Release( *pFileUrl );
750  }
751 
752  delete pStatInfo;
753  delete pFileUrl;
754  delete pDataServer;
755  delete pLoadBalancer;
756  delete [] pFileHandle;
757  delete pLFileHandler;
758  }
759 
760  //----------------------------------------------------------------------------
761  // Open with file template
762  //----------------------------------------------------------------------------
764  std::shared_ptr<FileStateHandler> &self,
765  ExportedFileTemplate *templ,
766  const std::string &url,
767  OpenFlags::Flags flags,
768  uint16_t mode,
769  ResponseHandler *handler,
770  time_t timeout )
771  {
772  if( !templ )
773  return XRootDStatus( stError, errInvalidArgs, 0, "Template file not available" );
774 
775  FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>( templ );
776  if( !fht )
777  return XRootDStatus( stError, errInvalidArgs, 0, "Template file invalid" );
778 
779  self->pTemplateFileWp = fht->pTemplateFileWp;
780 
781  return OpenImpl( self, url, flags, mode, handler, timeout );
782  }
783 
784  //----------------------------------------------------------------------------
785  // Open the file pointed to by the given URL
786  //----------------------------------------------------------------------------
787  XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
788  const std::string &url,
789  OpenFlags::Flags flags,
790  uint16_t mode,
791  ResponseHandler *handler,
792  time_t timeout )
793  {
794  self->pTemplateFileWp.reset();
795  return OpenImpl( self, url, flags, mode, handler, timeout );
796  }
797 
798  //----------------------------------------------------------------------------
799  // Most of Open implementation, used by Open and OpenUsingTemplate
800  //----------------------------------------------------------------------------
801  XRootDStatus FileStateHandler::OpenImpl( std::shared_ptr<FileStateHandler> &self,
802  const std::string &url,
803  OpenFlags::Flags flags,
804  uint16_t mode,
805  ResponseHandler *handler,
806  time_t timeout )
807  {
808  XrdSysMutexHelper scopedLock( self->pMutex );
809 
810  //--------------------------------------------------------------------------
811  // Check if we can proceed
812  //--------------------------------------------------------------------------
813  if( self->pFileState == Error )
814  return self->pStatus;
815 
816  if( self->pFileState == OpenInProgress )
818 
819  if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
820  self->pFileState == Recovering )
821  return XRootDStatus( stError, errInvalidOp );
822 
823  self->pFileState = OpenInProgress;
824 
825  //--------------------------------------------------------------------------
826  // Check if the parameters are valid
827  //--------------------------------------------------------------------------
828  Log *log = DefaultEnv::GetLog();
829 
830  if( self->pFileUrl )
831  {
832  if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
833  {
835  registry.Release( *self->pFileUrl );
836  }
837  delete self->pFileUrl;
838  self->pFileUrl = 0;
839  }
840 
841  self->pFileUrl = new URL( url );
842 
843  //--------------------------------------------------------------------------
844  // Add unique uuid to each open request so replays due to error/timeout
845  // recovery can be correctly handled.
846  //--------------------------------------------------------------------------
847  URL::ParamsMap cgi = self->pFileUrl->GetParams();
848  uuid_t uuid;
849  char requuid[37]= {0};
850  uuid_generate( uuid );
851  uuid_unparse( uuid, requuid );
852  cgi["xrdcl.requuid"] = requuid;
853  self->pFileUrl->SetParams( cgi );
854 
855  if( !self->pFileUrl->IsValid() )
856  {
857  log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
858  (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
859  self->pStatus = XRootDStatus( stError, errInvalidArgs );
860  self->pFileState = Closed;
861  return self->pStatus;
862  }
863 
864  //--------------------------------------------------------------------------
865  // Check if the recovery procedures should be enabled
866  //--------------------------------------------------------------------------
867  const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
868  URL::ParamsMap::const_iterator it;
869  it = urlParams.find( "xrdcl.recover-reads" );
870  if( (it != urlParams.end() && it->second == "false") ||
871  !self->pDoRecoverRead )
872  {
873  self->pDoRecoverRead = false;
874  log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
875  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
876  }
877 
878  it = urlParams.find( "xrdcl.recover-writes" );
879  if( (it != urlParams.end() && it->second == "false") ||
880  !self->pDoRecoverWrite )
881  {
882  self->pDoRecoverWrite = false;
883  log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
884  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
885  }
886 
887  //--------------------------------------------------------------------------
888  // Open the file
889  //--------------------------------------------------------------------------
890  log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
891  self->pFileUrl->GetObfuscatedURL().c_str() );
892 
893  self->pOpenMode = mode;
894  self->pOpenFlags = flags;
895  OpenHandler *openHandler = new OpenHandler( self, handler );
896 
897  Message *msg;
898  ClientOpenRequest *req;
899  std::string path = self->pFileUrl->GetPathWithFilteredParams();
900  MessageUtils::CreateRequest( msg, req, path.length() );
901 
902  req->requestid = kXR_open;
903  req->mode = mode;
904  req->options = (flags&0xffff) | kXR_async | kXR_retstat;
905  req->dlen = path.length();
906  URL sendUrl;
907  XRootDStatus st = FillFhTempl( self, *self->pFileUrl, msg, sendUrl );
908  if( !st.IsOK() )
909  {
910  delete openHandler;
911  self->pStatus = st;
912  self->pFileState = Closed;
913  return st;
914  }
915  msg->Append( path.c_str(), path.length(), 24 );
916 
918  MessageSendParams params; params.timeout = timeout;
919  params.followRedirects = self->pFollowRedirects;
921 
922  st = self->IssueRequest( sendUrl, msg, openHandler, params );
923 
924  if( !st.IsOK() )
925  {
926  delete openHandler;
927  self->pStatus = st;
928  self->pFileState = Closed;
929  return st;
930  }
931  return st;
932  }
933 
934  //----------------------------------------------------------------------------
935  // Close the file object
936  //----------------------------------------------------------------------------
937  XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
938  ResponseHandler *handler,
939  time_t timeout )
940  {
941  XrdSysMutexHelper scopedLock( self->pMutex );
942 
943  //--------------------------------------------------------------------------
944  // Check if we can proceed
945  //--------------------------------------------------------------------------
946  if( self->pFileState == Error )
947  return self->pStatus;
948 
949  if( self->pFileState == CloseInProgress )
951 
952  if( self->pFileState == Closed )
953  return XRootDStatus( stOK, suAlreadyDone );
954 
955  if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
956  return XRootDStatus( stError, errInvalidOp );
957 
958  if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
959  return XRootDStatus( stError, errInvalidOp );
960 
961  self->pFileState = CloseInProgress;
962 
963  Log *log = DefaultEnv::GetLog();
964  log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
965  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
966  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
967 
968  //--------------------------------------------------------------------------
969  // Close the file
970  //--------------------------------------------------------------------------
971  Message *msg;
972  ClientCloseRequest *req;
973  MessageUtils::CreateRequest( msg, req );
974 
975  req->requestid = kXR_close;
976  memcpy( req->fhandle, self->pFileHandle, 4 );
977 
979  msg->SetSessionId( self->pSessionId );
980  CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
981  MessageSendParams params;
982  params.timeout = timeout;
983  params.followRedirects = false;
984  params.stateful = true;
986 
987  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
988 
989  if( !st.IsOK() )
990  {
991  // an invalid-session error means the connection to the server has been
992  // closed, which in turn means that the server closed the file already
993  if( st.code == errInvalidSession || st.code == errSocketDisconnected ||
995  st.code == errPollerError || st.code == errSocketError )
996  {
997  self->pFileState = Closed;
998  ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
999  nullptr, nullptr );
1001  return XRootDStatus();
1002  }
1003 
1004  delete closeHandler;
1005  self->pStatus = st;
1006  self->pFileState = Error;
1007  return st;
1008  }
1009  return st;
1010  }
1011 
1012  //----------------------------------------------------------------------------
1013  // Stat the file
1014  //----------------------------------------------------------------------------
1015  XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
1016  bool force,
1017  ResponseHandler *handler,
1018  time_t timeout )
1019  {
1020  XrdSysMutexHelper scopedLock( self->pMutex );
1021 
1022  if( self->pFileState == Error ) return self->pStatus;
1023 
1024  if( self->pFileState != Opened && self->pFileState != Recovering )
1025  return XRootDStatus( stError, errInvalidOp );
1026 
1027  //--------------------------------------------------------------------------
1028  // Return the cached info
1029  //--------------------------------------------------------------------------
1030  if( !force )
1031  {
1032  AnyObject *obj = new AnyObject();
1033  obj->Set( new StatInfo( *self->pStatInfo ) );
1034  if (handler)
1035  handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
1036  return XRootDStatus();
1037  }
1038 
1039  Log *log = DefaultEnv::GetLog();
1040  log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
1041  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1042  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1043 
1044  //--------------------------------------------------------------------------
1045  // Issue a new stat request
1046  // stating a file handle doesn't work (fixed in 3.2.0) so we need to
1047  // stat the pat
1048  //--------------------------------------------------------------------------
1049  Message *msg;
1050  ClientStatRequest *req;
1051  std::string path = self->pFileUrl->GetPath();
1052  MessageUtils::CreateRequest( msg, req );
1053 
1054  req->requestid = kXR_stat;
1055  memcpy( req->fhandle, self->pFileHandle, 4 );
1056 
1057  MessageSendParams params;
1058  params.timeout = timeout;
1059  params.followRedirects = false;
1060  params.stateful = true;
1062 
1064  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1065 
1066  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1067  }
1068 
1069  //----------------------------------------------------------------------------
1070  // Preread scattered data tracts in one operation - async
1071  //----------------------------------------------------------------------------
1072  XRootDStatus FileStateHandler::PreRead( std::shared_ptr<FileStateHandler> &self,
1073  const TractList &tracts,
1074  ResponseHandler *handler,
1075  time_t timeout )
1076  {
1077  //--------------------------------------------------------------------------
1078  // Sanity check
1079  //--------------------------------------------------------------------------
1080  XrdSysMutexHelper scopedLock( self->pMutex );
1081 
1082  if( self->pFileState == Error ) return self->pStatus;
1083 
1084  if( self->pFileState != Opened && self->pFileState != Recovering )
1085  return XRootDStatus( stError, errInvalidOp );
1086 
1087  Log *log = DefaultEnv::GetLog();
1088  log->Debug( FileMsg, "[%p@%s] Sending an read+preread command for handle %#x to %s",
1089  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1090  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1091 
1092  //--------------------------------------------------------------------------
1093  // Build the message
1094  //--------------------------------------------------------------------------
1095  Message *msg;
1096  ClientReadRequest *req;
1097  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*tracts.size() + 8 );
1098 
1099  req->requestid = kXR_read;
1100  req->offset = 0;
1101  req->rlen = 0;
1102  memcpy( req->fhandle, self->pFileHandle, 4 );
1103  req->dlen = sizeof(readahead_list)*tracts.size() + 8;
1104 
1105  static char dummyBuff[8];
1106  ChunkList *list = new ChunkList();
1107  list->push_back( ChunkInfo( 0, 0, dummyBuff ) );
1108 
1109  //--------------------------------------------------------------------------
1110  // Copy the tract info
1111  //--------------------------------------------------------------------------
1112  readahead_list *dataTract = (readahead_list*)msg->GetBuffer( 24 + 8 );
1113  for( size_t i = 0; i < tracts.size(); ++i )
1114  {
1115  dataTract[i].rlen = tracts[i].length;
1116  dataTract[i].offset = tracts[i].offset;
1117  memcpy( dataTract[i].fhandle, req->fhandle, 4 );
1118  }
1119 
1120  //--------------------------------------------------------------------------
1121  // Send the message
1122  //--------------------------------------------------------------------------
1123  MessageSendParams params;
1124  params.timeout = timeout;
1125  params.followRedirects = false;
1126  params.stateful = true;
1127  params.chunkList = list;
1129 
1131  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1132 
1133  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1134  }
1135 
1136  //----------------------------------------------------------------------------
1137  // Read a data chunk at a given offset - sync
1138  //----------------------------------------------------------------------------
1139  XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1140  uint64_t offset,
1141  uint32_t size,
1142  void *buffer,
1143  ResponseHandler *handler,
1144  time_t timeout )
1145  {
1146  XrdSysMutexHelper scopedLock( self->pMutex );
1147 
1148  if( self->pFileState == Error ) return self->pStatus;
1149 
1150  if( self->pFileState != Opened && self->pFileState != Recovering )
1151  return XRootDStatus( stError, errInvalidOp );
1152 
1153  Log *log = DefaultEnv::GetLog();
1154  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1155  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1156  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1157 
1158  Message *msg;
1159  ClientReadRequest *req;
1160  MessageUtils::CreateRequest( msg, req );
1161 
1162  req->requestid = kXR_read;
1163  req->offset = offset;
1164  req->rlen = size;
1165  memcpy( req->fhandle, self->pFileHandle, 4 );
1166 
1167  ChunkList *list = new ChunkList();
1168  list->push_back( ChunkInfo( offset, size, buffer ) );
1169 
1171  MessageSendParams params;
1172  params.timeout = timeout;
1173  params.followRedirects = false;
1174  params.stateful = true;
1175  params.chunkList = list;
1177  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1178 
1179  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1180  }
1181 
1182  //------------------------------------------------------------------------
1183  // Read data pages at a given offset
1184  //------------------------------------------------------------------------
1185  XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1186  uint64_t offset,
1187  uint32_t size,
1188  void *buffer,
1189  ResponseHandler *handler,
1190  time_t timeout )
1191  {
1192  int issupported = true;
1193  AnyObject obj;
1195  int protver = 0;
1196  XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1197  if( st1.IsOK() && st2.IsOK() )
1198  {
1199  int *ptr = 0;
1200  obj.Get( ptr );
1201  issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1202  delete ptr;
1203  }
1204  else
1205  issupported = false;
1206 
1207  if( !issupported )
1208  {
1209  DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1210  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1211  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1212  auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1213  if( !st.IsOK() ) delete substitHandler;
1214  return st;
1215  }
1216 
1217  ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1218  auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1219  if( !st.IsOK() ) delete pgHandler;
1220  return st;
1221  }
1222 
1223  XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1224  uint64_t offset,
1225  uint32_t size,
1226  size_t pgnb,
1227  void *buffer,
1228  PgReadHandler *handler,
1229  time_t timeout )
1230  {
1231  if( size > (uint32_t)XrdSys::PageSize )
1232  return XRootDStatus( stError, errInvalidArgs, EINVAL,
1233  "PgRead retry size exceeded 4KB." );
1234 
1235  ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1236  XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1237  if( !st.IsOK() ) delete retryHandler;
1238  return st;
1239  }
1240 
1241  XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1242  uint64_t offset,
1243  uint32_t size,
1244  void *buffer,
1245  uint16_t flags,
1246  ResponseHandler *handler,
1247  time_t timeout )
1248  {
1249  XrdSysMutexHelper scopedLock( self->pMutex );
1250 
1251  if( self->pFileState == Error ) return self->pStatus;
1252 
1253  if( self->pFileState != Opened && self->pFileState != Recovering )
1254  return XRootDStatus( stError, errInvalidOp );
1255 
1256  Log *log = DefaultEnv::GetLog();
1257  log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1258  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1259  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1260 
1261  Message *msg;
1262  ClientPgReadRequest *req;
1263  MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1264 
1265  req->requestid = kXR_pgread;
1266  req->offset = offset;
1267  req->rlen = size;
1268  memcpy( req->fhandle, self->pFileHandle, 4 );
1269 
1270  //--------------------------------------------------------------------------
1271  // Now adjust the message size so it can hold PgRead arguments
1272  //--------------------------------------------------------------------------
1273  req->dlen = sizeof( ClientPgReadReqArgs );
1274  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1275  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1276  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1277  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1278  args->reqflags = flags;
1279 
1280  ChunkList *list = new ChunkList();
1281  list->push_back( ChunkInfo( offset, size, buffer ) );
1282 
1284  MessageSendParams params;
1285  params.timeout = timeout;
1286  params.followRedirects = false;
1287  params.stateful = true;
1288  params.chunkList = list;
1290  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1291 
1292  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1293  }
1294 
1295  //----------------------------------------------------------------------------
1296  // Write a data chunk at a given offset - async
1297  //----------------------------------------------------------------------------
1298  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1299  uint64_t offset,
1300  uint32_t size,
1301  const void *buffer,
1302  ResponseHandler *handler,
1303  time_t timeout )
1304  {
1305  XrdSysMutexHelper scopedLock( self->pMutex );
1306 
1307  if( self->pFileState == Error ) return self->pStatus;
1308 
1309  if( self->pFileState != Opened && self->pFileState != Recovering )
1310  return XRootDStatus( stError, errInvalidOp );
1311 
1312  Log *log = DefaultEnv::GetLog();
1313  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1314  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1315  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1316 
1317  Message *msg;
1318  ClientWriteRequest *req;
1319  MessageUtils::CreateRequest( msg, req );
1320 
1321  req->requestid = kXR_write;
1322  req->offset = offset;
1323  req->dlen = size;
1324  memcpy( req->fhandle, self->pFileHandle, 4 );
1325 
1326  ChunkList *list = new ChunkList();
1327  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1328 
1329  MessageSendParams params;
1330  params.timeout = timeout;
1331  params.followRedirects = false;
1332  params.stateful = true;
1333  params.chunkList = list;
1334 
1336 
1338  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1339 
1340  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1341  }
1342 
1343  //----------------------------------------------------------------------------
1344  // Write a data chunk at a given offset
1345  //----------------------------------------------------------------------------
1346  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1347  uint64_t offset,
1348  Buffer &&buffer,
1349  ResponseHandler *handler,
1350  time_t timeout )
1351  {
1352  //--------------------------------------------------------------------------
1353  // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1354  // so fall back to normal write
1355  //--------------------------------------------------------------------------
1356  if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1357  {
1358  Log *log = DefaultEnv::GetLog();
1359  log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1360  "cannot convert it to kernel space buffer.", (void*)self.get(),
1361  self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1362 
1363  void *buff = buffer.GetBuffer();
1364  uint32_t size = buffer.GetSize();
1365  ReleaseBufferHandler *wrtHandler =
1366  new ReleaseBufferHandler( std::move( buffer ), handler );
1367  XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1368  if( !st.IsOK() )
1369  {
1370  buffer = std::move( wrtHandler->GetBuffer() );
1371  delete wrtHandler;
1372  }
1373  return st;
1374  }
1375 
1376  //--------------------------------------------------------------------------
1377  // Transfer the data from user space to kernel space
1378  //--------------------------------------------------------------------------
1379  uint32_t length = buffer.GetSize();
1380  char *ubuff = buffer.Release();
1381 
1382  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1383  ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1384  if( ret < 0 )
1385  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1386 
1387  //--------------------------------------------------------------------------
1388  // Now create a write request and enqueue it
1389  //--------------------------------------------------------------------------
1390  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1391  }
1392 
1393  //----------------------------------------------------------------------------
1394  // Write a data from a given file descriptor at a given offset - async
1395  //----------------------------------------------------------------------------
1396  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1397  uint64_t offset,
1398  uint32_t size,
1399  Optional<uint64_t> fdoff,
1400  int fd,
1401  ResponseHandler *handler,
1402  time_t timeout )
1403  {
1404  //--------------------------------------------------------------------------
1405  // Read the data from the file descriptor into a kernel buffer
1406  //--------------------------------------------------------------------------
1407  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1408  ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1409  XrdSys::Read( fd, *kbuff, size );
1410  if( ret < 0 )
1411  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1412 
1413  //--------------------------------------------------------------------------
1414  // Now create a write request and enqueue it
1415  //--------------------------------------------------------------------------
1416  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1417  }
1418 
1419  //----------------------------------------------------------------------------
1420  // Write number of pages at a given offset - async
1421  //----------------------------------------------------------------------------
1422  XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1423  uint64_t offset,
1424  uint32_t size,
1425  const void *buffer,
1426  std::vector<uint32_t> &cksums,
1427  ResponseHandler *handler,
1428  time_t timeout )
1429  {
1430  //--------------------------------------------------------------------------
1431  // Resolve timeout value
1432  //--------------------------------------------------------------------------
1433  if( timeout == 0 )
1434  {
1435  int val = DefaultRequestTimeout;
1436  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1437  timeout = val;
1438  }
1439 
1440  //--------------------------------------------------------------------------
1441  // Validate the digest vector size
1442  //--------------------------------------------------------------------------
1443  if( cksums.empty() )
1444  {
1445  const char *data = static_cast<const char*>( buffer );
1446  XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1447  }
1448  else
1449  {
1450  size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1451  if( crc32cCnt != cksums.size() )
1452  return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1453  }
1454 
1455  //--------------------------------------------------------------------------
1456  // Create a context for PgWrite operation
1457  //--------------------------------------------------------------------------
1458  struct pgwrt_t
1459  {
1460  pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1461  {
1462  }
1463 
1464  ~pgwrt_t()
1465  {
1466  if( handler )
1467  {
1468  // if all retries were successful no error status was set
1469  if( !status ) status = new XRootDStatus();
1470  handler->HandleResponse( status, nullptr );
1471  }
1472  }
1473 
1474  static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1475  {
1476  if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1477  return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1478  }
1479 
1480  inline void SetStatus( XRootDStatus* s )
1481  {
1482  if( !status ) status = s;
1483  else delete s;
1484  }
1485 
1486  ResponseHandler *handler;
1487  XRootDStatus *status;
1488  };
1489  auto pgwrt = std::make_shared<pgwrt_t>( handler );
1490 
1491  int fLen, lLen;
1492  XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1493  uint32_t fstpglen = fLen;
1494 
1495  time_t start = ::time( nullptr );
1496  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1497  {
1498  std::unique_ptr<AnyObject> scoped( r );
1499  // if the request failed simply pass the status to the
1500  // user handler
1501  if( !s->IsOK() )
1502  {
1503  pgwrt->SetStatus( s );
1504  return; // pgwrt destructor will call the handler
1505  }
1506  // also if the request was sucessful and there were no
1507  // corrupted pages pass the status to the user handler
1508  RetryInfo *inf = nullptr;
1509  r->Get( inf );
1510  if( !inf->NeedRetry() )
1511  {
1512  pgwrt->SetStatus( s );
1513  return; // pgwrt destructor will call the handler
1514  }
1515  delete s;
1516  // first adjust the timeout value
1517  time_t elapsed = ::time( nullptr ) - start;
1518  if( elapsed >= timeout )
1519  {
1520  pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1521  return; // pgwrt destructor will call the handler
1522  }
1523  else timeout -= elapsed;
1524  // retransmit the corrupted pages
1525  for( size_t i = 0; i < inf->Size(); ++i )
1526  {
1527  auto tpl = inf->At( i );
1528  uint64_t pgoff = std::get<0>( tpl );
1529  uint32_t pglen = std::get<1>( tpl );
1530  const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1531  uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1532  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1533  {
1534  std::unique_ptr<AnyObject> scoped( r );
1535  // if we failed simply set the status
1536  if( !s->IsOK() )
1537  {
1538  pgwrt->SetStatus( s );
1539  return; // the destructor will call the handler
1540  }
1541  delete s;
1542  // otherwise check if the data were not corrupted again
1543  RetryInfo *inf = nullptr;
1544  r->Get( inf );
1545  if( inf->NeedRetry() ) // so we failed in the end
1546  {
1547  DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1548  "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1549  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1550  pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1551  "Failed to retransmit corrupted page" ) );
1552  }
1553  else
1554  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1555  "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1556  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1557  } );
1558  auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1559  if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1560  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1561  "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1562  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1563  }
1564  } );
1565 
1566  auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1567  if( !st.IsOK() )
1568  {
1569  pgwrt->handler = nullptr;
1570  delete h;
1571  }
1572  return st;
1573  }
1574 
1575  //------------------------------------------------------------------------
1576  // Write number of pages at a given offset - async
1577  //------------------------------------------------------------------------
1578  XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1579  uint64_t offset,
1580  uint32_t size,
1581  const void *buffer,
1582  uint32_t digest,
1583  ResponseHandler *handler,
1584  time_t timeout )
1585  {
1586  std::vector<uint32_t> cksums{ digest };
1587  return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1588  }
1589 
1590  //------------------------------------------------------------------------
1591  // Write number of pages at a given offset - async
1592  //------------------------------------------------------------------------
1593  XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1594  uint64_t offset,
1595  uint32_t size,
1596  const void *buffer,
1597  std::vector<uint32_t> &cksums,
1598  kXR_char flags,
1599  ResponseHandler *handler,
1600  time_t timeout )
1601  {
1602  XrdSysMutexHelper scopedLock( self->pMutex );
1603 
1604  if( self->pFileState == Error ) return self->pStatus;
1605 
1606  if( self->pFileState != Opened && self->pFileState != Recovering )
1607  return XRootDStatus( stError, errInvalidOp );
1608 
1609  Log *log = DefaultEnv::GetLog();
1610  log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1611  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1612  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1613 
1614  //--------------------------------------------------------------------------
1615  // Create the message
1616  //--------------------------------------------------------------------------
1617  Message *msg;
1618  ClientPgWriteRequest *req;
1619  MessageUtils::CreateRequest( msg, req );
1620 
1621  req->requestid = kXR_pgwrite;
1622  req->offset = offset;
1623  req->dlen = size + cksums.size() * sizeof( uint32_t );
1624  req->reqflags = flags;
1625  memcpy( req->fhandle, self->pFileHandle, 4 );
1626 
1627  ChunkList *list = new ChunkList();
1628  list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1629 
1630  MessageSendParams params;
1631  params.timeout = timeout;
1632  params.followRedirects = false;
1633  params.stateful = true;
1634  params.chunkList = list;
1635  params.crc32cDigests.swap( cksums );
1636 
1638 
1640  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1641 
1642  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1643  }
1644 
1645  //----------------------------------------------------------------------------
1646  // Commit all pending disk writes - async
1647  //----------------------------------------------------------------------------
1648  XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1649  ResponseHandler *handler,
1650  time_t timeout )
1651  {
1652  XrdSysMutexHelper scopedLock( self->pMutex );
1653 
1654  if( self->pFileState == Error ) return self->pStatus;
1655 
1656  if( self->pFileState != Opened && self->pFileState != Recovering )
1657  return XRootDStatus( stError, errInvalidOp );
1658 
1659  Log *log = DefaultEnv::GetLog();
1660  log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1661  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1662  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1663 
1664  Message *msg;
1665  ClientSyncRequest *req;
1666  MessageUtils::CreateRequest( msg, req );
1667 
1668  req->requestid = kXR_sync;
1669  memcpy( req->fhandle, self->pFileHandle, 4 );
1670 
1671  MessageSendParams params;
1672  params.timeout = timeout;
1673  params.followRedirects = false;
1674  params.stateful = true;
1676 
1678  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1679 
1680  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1681  }
1682 
1683  //----------------------------------------------------------------------------
1684  // Truncate the file to a particular size - async
1685  //----------------------------------------------------------------------------
1686  XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1687  uint64_t size,
1688  ResponseHandler *handler,
1689  time_t timeout )
1690  {
1691  XrdSysMutexHelper scopedLock( self->pMutex );
1692 
1693  if( self->pFileState == Error ) return self->pStatus;
1694 
1695  if( self->pFileState != Opened && self->pFileState != Recovering )
1696  return XRootDStatus( stError, errInvalidOp );
1697 
1698  Log *log = DefaultEnv::GetLog();
1699  log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1700  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1701  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1702 
1703  Message *msg;
1704  ClientTruncateRequest *req;
1705  MessageUtils::CreateRequest( msg, req );
1706 
1707  req->requestid = kXR_truncate;
1708  memcpy( req->fhandle, self->pFileHandle, 4 );
1709  req->offset = size;
1710 
1711  MessageSendParams params;
1712  params.timeout = timeout;
1713  params.followRedirects = false;
1714  params.stateful = true;
1716 
1718  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1719 
1720  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1721  }
1722 
1723  //----------------------------------------------------------------------------
1724  // Read scattered data chunks in one operation - async
1725  //----------------------------------------------------------------------------
1726  XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1727  const ChunkList &chunks,
1728  void *buffer,
1729  ResponseHandler *handler,
1730  time_t timeout )
1731  {
1732  //--------------------------------------------------------------------------
1733  // Sanity check
1734  //--------------------------------------------------------------------------
1735  XrdSysMutexHelper scopedLock( self->pMutex );
1736 
1737  if( self->pFileState == Error ) return self->pStatus;
1738 
1739  if( self->pFileState != Opened && self->pFileState != Recovering )
1740  return XRootDStatus( stError, errInvalidOp );
1741 
1742  Log *log = DefaultEnv::GetLog();
1743  log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1744  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1745  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1746 
1747  //--------------------------------------------------------------------------
1748  // Build the message
1749  //--------------------------------------------------------------------------
1750  Message *msg;
1751  ClientReadVRequest *req;
1752  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1753 
1754  req->requestid = kXR_readv;
1755  req->dlen = sizeof(readahead_list)*chunks.size();
1756 
1757  ChunkList *list = new ChunkList();
1758  char *cursor = (char*)buffer;
1759 
1760  //--------------------------------------------------------------------------
1761  // Copy the chunk info
1762  //--------------------------------------------------------------------------
1763  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1764  for( size_t i = 0; i < chunks.size(); ++i )
1765  {
1766  dataChunk[i].rlen = chunks[i].length;
1767  dataChunk[i].offset = chunks[i].offset;
1768  memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1769 
1770  void *chunkBuffer;
1771  if( cursor )
1772  {
1773  chunkBuffer = cursor;
1774  cursor += chunks[i].length;
1775  }
1776  else
1777  chunkBuffer = chunks[i].buffer;
1778 
1779  list->push_back( ChunkInfo( chunks[i].offset,
1780  chunks[i].length,
1781  chunkBuffer ) );
1782  }
1783 
1784  //--------------------------------------------------------------------------
1785  // Send the message
1786  //--------------------------------------------------------------------------
1787  MessageSendParams params;
1788  params.timeout = timeout;
1789  params.followRedirects = false;
1790  params.stateful = true;
1791  params.chunkList = list;
1793 
1795  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1796 
1797  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1798  }
1799 
1800  //------------------------------------------------------------------------
1801  // Write scattered data chunks in one operation - async
1802  //------------------------------------------------------------------------
1803  XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1804  const ChunkList &chunks,
1805  ResponseHandler *handler,
1806  time_t timeout )
1807  {
1808  //--------------------------------------------------------------------------
1809  // Sanity check
1810  //--------------------------------------------------------------------------
1811  XrdSysMutexHelper scopedLock( self->pMutex );
1812 
1813  if( self->pFileState == Error ) return self->pStatus;
1814 
1815  if( self->pFileState != Opened && self->pFileState != Recovering )
1816  return XRootDStatus( stError, errInvalidOp );
1817 
1818  Log *log = DefaultEnv::GetLog();
1819  log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1820  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1821  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1822 
1823  //--------------------------------------------------------------------------
1824  // Determine the size of the payload
1825  //--------------------------------------------------------------------------
1826 
1827  // the size of write vector
1828  uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1829 
1830  //--------------------------------------------------------------------------
1831  // Build the message
1832  //--------------------------------------------------------------------------
1833  Message *msg;
1834  ClientWriteVRequest *req;
1835  MessageUtils::CreateRequest( msg, req, payloadSize );
1836 
1837  req->requestid = kXR_writev;
1838  req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1839 
1840  ChunkList *list = new ChunkList();
1841 
1842  //--------------------------------------------------------------------------
1843  // Copy the chunk info
1844  //--------------------------------------------------------------------------
1845  XrdProto::write_list *writeList =
1846  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1847 
1848 
1849 
1850  for( size_t i = 0; i < chunks.size(); ++i )
1851  {
1852  writeList[i].wlen = chunks[i].length;
1853  writeList[i].offset = chunks[i].offset;
1854  memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1855 
1856  list->push_back( ChunkInfo( chunks[i].offset,
1857  chunks[i].length,
1858  chunks[i].buffer ) );
1859  }
1860 
1861  //--------------------------------------------------------------------------
1862  // Send the message
1863  //--------------------------------------------------------------------------
1864  MessageSendParams params;
1865  params.timeout = timeout;
1866  params.followRedirects = false;
1867  params.stateful = true;
1868  params.chunkList = list;
1870 
1872  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1873 
1874  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1875  }
1876 
1877  //------------------------------------------------------------------------
1878  // Write scattered buffers in one operation - async
1879  //------------------------------------------------------------------------
1880  XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1881  uint64_t offset,
1882  const struct iovec *iov,
1883  int iovcnt,
1884  ResponseHandler *handler,
1885  time_t timeout )
1886  {
1887  XrdSysMutexHelper scopedLock( self->pMutex );
1888 
1889  if( self->pFileState == Error ) return self->pStatus;
1890 
1891  if( self->pFileState != Opened && self->pFileState != Recovering )
1892  return XRootDStatus( stError, errInvalidOp );
1893 
1894  Log *log = DefaultEnv::GetLog();
1895  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1896  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1897  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1898 
1899  Message *msg;
1900  ClientWriteRequest *req;
1901  MessageUtils::CreateRequest( msg, req );
1902 
1903  ChunkList *list = new ChunkList();
1904 
1905  uint32_t size = 0;
1906  for( int i = 0; i < iovcnt; ++i )
1907  {
1908  if( iov[i].iov_len == 0 ) continue;
1909  size += iov[i].iov_len;
1910  list->push_back( ChunkInfo( 0, iov[i].iov_len,
1911  (char*)iov[i].iov_base ) );
1912  }
1913 
1914  req->requestid = kXR_write;
1915  req->offset = offset;
1916  req->dlen = size;
1917  memcpy( req->fhandle, self->pFileHandle, 4 );
1918 
1919  MessageSendParams params;
1920  params.timeout = timeout;
1921  params.followRedirects = false;
1922  params.stateful = true;
1923  params.chunkList = list;
1924 
1926 
1928  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1929 
1930  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1931  }
1932 
1933  //------------------------------------------------------------------------
1934  // Read data into scattered buffers in one operation - async
1935  //------------------------------------------------------------------------
1936  XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1937  uint64_t offset,
1938  struct iovec *iov,
1939  int iovcnt,
1940  ResponseHandler *handler,
1941  time_t timeout )
1942  {
1943  XrdSysMutexHelper scopedLock( self->pMutex );
1944 
1945  if( self->pFileState == Error ) return self->pStatus;
1946 
1947  if( self->pFileState != Opened && self->pFileState != Recovering )
1948  return XRootDStatus( stError, errInvalidOp );
1949 
1950  Log *log = DefaultEnv::GetLog();
1951  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1952  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1953  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1954 
1955  Message *msg;
1956  ClientReadRequest *req;
1957  MessageUtils::CreateRequest( msg, req );
1958 
1959  // calculate the total read size
1960  size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1961  {
1962  return acc + rhs.iov_len;
1963  } );
1964  req->requestid = kXR_read;
1965  req->offset = offset;
1966  req->rlen = size;
1967  msg->SetVirtReqID( kXR_virtReadv );
1968  memcpy( req->fhandle, self->pFileHandle, 4 );
1969 
1970  ChunkList *list = new ChunkList();
1971  list->reserve( iovcnt );
1972  uint64_t choff = offset;
1973  for( int i = 0; i < iovcnt; ++i )
1974  {
1975  list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1976  choff += iov[i].iov_len;
1977  }
1978 
1980  MessageSendParams params;
1981  params.timeout = timeout;
1982  params.followRedirects = false;
1983  params.stateful = true;
1984  params.chunkList = list;
1986  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1987 
1988  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1989  }
1990 
1991 
1992  //----------------------------------------------------------------------------
1993  // Performs a custom operation on an open file, server implementation
1994  // dependent - async
1995  //----------------------------------------------------------------------------
1996  XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1997  QueryCode::Code queryCode,
1998  const Buffer &arg,
1999  ResponseHandler *handler,
2000  time_t timeout )
2001  {
2002  XrdSysMutexHelper scopedLock( self->pMutex );
2003 
2004  if( self->pFileState == Error ) return self->pStatus;
2005 
2006  if( self->pFileState != Opened && self->pFileState != Recovering )
2007  return XRootDStatus( stError, errInvalidOp );
2008 
2009  Log *log = DefaultEnv::GetLog();
2010  log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
2011  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2012  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2013 
2014  Message *msg;
2015  ClientQueryRequest *req;
2016  MessageUtils::CreateRequest( msg, req, arg.GetSize() );
2017 
2018  req->requestid = kXR_query;
2019  req->infotype = queryCode;
2020  req->dlen = arg.GetSize();
2021  memcpy( req->fhandle, self->pFileHandle, 4 );
2022  msg->Append( arg.GetBuffer(), arg.GetSize(), sizeof(ClientQueryRequest) );
2023 
2024  MessageSendParams params;
2025  params.timeout = timeout;
2026  params.followRedirects = false;
2027  params.stateful = true;
2029 
2031  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2032 
2033  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2034  }
2035 
2036  //----------------------------------------------------------------------------
2037  // Get access token to a file - async
2038  //----------------------------------------------------------------------------
2039  XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
2040  ResponseHandler *handler,
2041  time_t timeout )
2042  {
2043  XrdSysMutexHelper scopedLock( self->pMutex );
2044 
2045  if( self->pFileState == Error ) return self->pStatus;
2046 
2047  if( self->pFileState != Opened && self->pFileState != Recovering )
2048  return XRootDStatus( stError, errInvalidOp );
2049 
2050  Log *log = DefaultEnv::GetLog();
2051  log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
2052  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2053  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2054 
2055  Message *msg;
2056  ClientQueryRequest *req;
2057  MessageUtils::CreateRequest( msg, req );
2058 
2059  req->requestid = kXR_query;
2060  req->infotype = kXR_Qvisa;
2061  memcpy( req->fhandle, self->pFileHandle, 4 );
2062 
2063  MessageSendParams params;
2064  params.timeout = timeout;
2065  params.followRedirects = false;
2066  params.stateful = true;
2068 
2070  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2071 
2072  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2073  }
2074 
2075  //------------------------------------------------------------------------
2076  // Set extended attributes - async
2077  //------------------------------------------------------------------------
2078  XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
2079  const std::vector<xattr_t> &attrs,
2080  ResponseHandler *handler,
2081  time_t timeout )
2082  {
2083  XrdSysMutexHelper scopedLock( self->pMutex );
2084 
2085  if( self->pFileState == Error ) return self->pStatus;
2086 
2087  if( self->pFileState != Opened && self->pFileState != Recovering )
2088  return XRootDStatus( stError, errInvalidOp );
2089 
2090  Log *log = DefaultEnv::GetLog();
2091  log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
2092  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2093  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2094 
2095  //--------------------------------------------------------------------------
2096  // Issue a new fattr get request
2097  //--------------------------------------------------------------------------
2098  return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
2099  }
2100 
2101  //------------------------------------------------------------------------
2102  // Get extended attributes - async
2103  //------------------------------------------------------------------------
2104  XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
2105  const std::vector<std::string> &attrs,
2106  ResponseHandler *handler,
2107  time_t timeout )
2108  {
2109  XrdSysMutexHelper scopedLock( self->pMutex );
2110 
2111  if( self->pFileState == Error ) return self->pStatus;
2112 
2113  if( self->pFileState != Opened && self->pFileState != Recovering )
2114  return XRootDStatus( stError, errInvalidOp );
2115 
2116  Log *log = DefaultEnv::GetLog();
2117  log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
2118  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2119  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2120 
2121  //--------------------------------------------------------------------------
2122  // Issue a new fattr get request
2123  //--------------------------------------------------------------------------
2124  return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
2125  }
2126 
2127  //------------------------------------------------------------------------
2128  // Delete extended attributes - async
2129  //------------------------------------------------------------------------
2130  XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
2131  const std::vector<std::string> &attrs,
2132  ResponseHandler *handler,
2133  time_t timeout )
2134  {
2135  XrdSysMutexHelper scopedLock( self->pMutex );
2136 
2137  if( self->pFileState == Error ) return self->pStatus;
2138 
2139  if( self->pFileState != Opened && self->pFileState != Recovering )
2140  return XRootDStatus( stError, errInvalidOp );
2141 
2142  Log *log = DefaultEnv::GetLog();
2143  log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2144  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2145  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2146 
2147  //--------------------------------------------------------------------------
2148  // Issue a new fattr del request
2149  //--------------------------------------------------------------------------
2150  return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2151  }
2152 
2153  //------------------------------------------------------------------------
2154  // List extended attributes - async
2155  //------------------------------------------------------------------------
2156  XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2157  ResponseHandler *handler,
2158  time_t timeout )
2159  {
2160  XrdSysMutexHelper scopedLock( self->pMutex );
2161 
2162  if( self->pFileState == Error ) return self->pStatus;
2163 
2164  if( self->pFileState != Opened && self->pFileState != Recovering )
2165  return XRootDStatus( stError, errInvalidOp );
2166 
2167  Log *log = DefaultEnv::GetLog();
2168  log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2169  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2170  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2171 
2172  //--------------------------------------------------------------------------
2173  // Issue a new fattr get request
2174  //--------------------------------------------------------------------------
2175  static const std::vector<std::string> nothing;
2176  return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2177  nothing, handler, timeout );
2178  }
2179 
2180  //------------------------------------------------------------------------
2190  //------------------------------------------------------------------------
2191  XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2192  kXR_char code,
2193  ResponseHandler *handler,
2194  time_t timeout )
2195  {
2196  XrdSysMutexHelper scopedLock( self->pMutex );
2197 
2198  if( self->pFileState == Error ) return self->pStatus;
2199 
2200  if( self->pFileState != Opened && self->pFileState != Recovering )
2201  return XRootDStatus( stError, errInvalidOp );
2202 
2203  Log *log = DefaultEnv::GetLog();
2204  log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2205  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2206  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2207 
2208  Message *msg;
2209  ClientChkPointRequest *req;
2210  MessageUtils::CreateRequest( msg, req );
2211 
2212  req->requestid = kXR_chkpoint;
2213  req->opcode = code;
2214  memcpy( req->fhandle, self->pFileHandle, 4 );
2215 
2216  MessageSendParams params;
2217  params.timeout = timeout;
2218  params.followRedirects = false;
2219  params.stateful = true;
2220 
2222 
2224  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2225 
2226  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2227  }
2228 
2229  //------------------------------------------------------------------------
2239  //------------------------------------------------------------------------
2240  XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2241  uint64_t offset,
2242  uint32_t size,
2243  const void *buffer,
2244  ResponseHandler *handler,
2245  time_t timeout )
2246  {
2247  XrdSysMutexHelper scopedLock( self->pMutex );
2248 
2249  if( self->pFileState == Error ) return self->pStatus;
2250 
2251  if( self->pFileState != Opened && self->pFileState != Recovering )
2252  return XRootDStatus( stError, errInvalidOp );
2253 
2254  Log *log = DefaultEnv::GetLog();
2255  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2256  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2257  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2258 
2259  Message *msg;
2260  ClientChkPointRequest *req;
2261  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2262 
2263  req->requestid = kXR_chkpoint;
2264  req->opcode = kXR_ckpXeq;
2265  req->dlen = 24; // as specified in the protocol specification
2266  memcpy( req->fhandle, self->pFileHandle, 4 );
2267 
2269  wrtreq->requestid = kXR_write;
2270  wrtreq->offset = offset;
2271  wrtreq->dlen = size;
2272  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2273 
2274  ChunkList *list = new ChunkList();
2275  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2276 
2277  MessageSendParams params;
2278  params.timeout = timeout;
2279  params.followRedirects = false;
2280  params.stateful = true;
2281  params.chunkList = list;
2282 
2284 
2286  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2287 
2288  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2289  }
2290 
2291  //------------------------------------------------------------------------
2301  //------------------------------------------------------------------------
2302  XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2303  uint64_t offset,
2304  const struct iovec *iov,
2305  int iovcnt,
2306  ResponseHandler *handler,
2307  time_t timeout )
2308  {
2309  XrdSysMutexHelper scopedLock( self->pMutex );
2310 
2311  if( self->pFileState == Error ) return self->pStatus;
2312 
2313  if( self->pFileState != Opened && self->pFileState != Recovering )
2314  return XRootDStatus( stError, errInvalidOp );
2315 
2316  Log *log = DefaultEnv::GetLog();
2317  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2318  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2319  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2320 
2321  Message *msg;
2322  ClientChkPointRequest *req;
2323  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2324 
2325  req->requestid = kXR_chkpoint;
2326  req->opcode = kXR_ckpXeq;
2327  req->dlen = 24; // as specified in the protocol specification
2328  memcpy( req->fhandle, self->pFileHandle, 4 );
2329 
2330  ChunkList *list = new ChunkList();
2331  uint32_t size = 0;
2332  for( int i = 0; i < iovcnt; ++i )
2333  {
2334  if( iov[i].iov_len == 0 ) continue;
2335  size += iov[i].iov_len;
2336  list->push_back( ChunkInfo( 0, iov[i].iov_len,
2337  (char*)iov[i].iov_base ) );
2338  }
2339 
2341  wrtreq->requestid = kXR_write;
2342  wrtreq->offset = offset;
2343  wrtreq->dlen = size;
2344  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2345 
2346  MessageSendParams params;
2347  params.timeout = timeout;
2348  params.followRedirects = false;
2349  params.stateful = true;
2350  params.chunkList = list;
2351 
2353 
2355  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2356 
2357  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2358  }
2359 
2360  //----------------------------------------------------------------------------
2361  // Check if the file is open
2362  //----------------------------------------------------------------------------
2364  {
2365  XrdSysMutexHelper scopedLock( pMutex );
2366 
2367  if( pFileState == Opened || pFileState == Recovering )
2368  return true;
2369  return false;
2370  }
2371 
2372  //----------------------------------------------------------------------------
2373  // Set file property
2374  //----------------------------------------------------------------------------
2375  bool FileStateHandler::SetProperty( const std::string &name,
2376  const std::string &value )
2377  {
2378  XrdSysMutexHelper scopedLock( pMutex );
2379  if( name == "ReadRecovery" )
2380  {
2381  if( value == "true" ) pDoRecoverRead = true;
2382  else pDoRecoverRead = false;
2383  return true;
2384  }
2385  else if( name == "WriteRecovery" )
2386  {
2387  if( value == "true" ) pDoRecoverWrite = true;
2388  else pDoRecoverWrite = false;
2389  return true;
2390  }
2391  else if( name == "FollowRedirects" )
2392  {
2393  if( value == "true" ) pFollowRedirects = true;
2394  else pFollowRedirects = false;
2395  return true;
2396  }
2397  else if( name == "BundledClose" )
2398  {
2399  if( value == "true" ) pAllowBundledClose = true;
2400  else pAllowBundledClose = false;
2401  return true;
2402  }
2403  return false;
2404  }
2405 
2406  //----------------------------------------------------------------------------
2407  // Get file property
2408  //----------------------------------------------------------------------------
2409  bool FileStateHandler::GetProperty( const std::string &name,
2410  std::string &value ) const
2411  {
2412  XrdSysMutexHelper scopedLock( pMutex );
2413  if( name == "ReadRecovery" )
2414  {
2415  if( pDoRecoverRead ) value = "true";
2416  else value = "false";
2417  return true;
2418  }
2419  else if( name == "WriteRecovery" )
2420  {
2421  if( pDoRecoverWrite ) value = "true";
2422  else value = "false";
2423  return true;
2424  }
2425  else if( name == "FollowRedirects" )
2426  {
2427  if( pFollowRedirects ) value = "true";
2428  else value = "false";
2429  return true;
2430  }
2431  else if( name == "DataServer" && pDataServer )
2432  { value = pDataServer->GetHostId(); return true; }
2433  else if( name == "LastURL" && pDataServer )
2434  { value = pDataServer->GetURL(); return true; }
2435  else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2436  { value = pWrtRecoveryRedir->GetHostId(); return true; }
2437  value = "";
2438  return false;
2439  }
2440 
2441  //----------------------------------------------------------------------------
2442  // Process the results of the opening operation
2443  //----------------------------------------------------------------------------
2445  const OpenInfo *openInfo,
2446  const HostList *hostList )
2447  {
2448  Log *log = DefaultEnv::GetLog();
2449  XrdSysMutexHelper scopedLock( pMutex );
2450 
2451  //--------------------------------------------------------------------------
2452  // Assign the data server and the load balancer
2453  //--------------------------------------------------------------------------
2454  std::string lastServer = pFileUrl->GetHostId();
2455  if( hostList )
2456  {
2457  delete pDataServer;
2458  delete pLoadBalancer;
2459  pLoadBalancer = 0;
2460  delete pWrtRecoveryRedir;
2461  pWrtRecoveryRedir = 0;
2462 
2463  pDataServer = new URL( hostList->back().url );
2464  pDataServer->SetParams( pFileUrl->GetParams() );
2465  if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2466  lastServer = pDataServer->GetHostId();
2467  HostList::const_iterator itC;
2468  URL::ParamsMap params = pDataServer->GetParams();
2469  for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2470  {
2471  MessageUtils::MergeCGI( params,
2472  itC->url.GetParams(),
2473  true );
2474  }
2475  pDataServer->SetParams( params );
2476 
2477  HostList::const_reverse_iterator it;
2478  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2479  if( it->loadBalancer )
2480  {
2481  pLoadBalancer = new URL( it->url );
2482  break;
2483  }
2484 
2485  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2486  if( it->flags & kXR_recoverWrts )
2487  {
2488  pWrtRecoveryRedir = new URL( it->url );
2489  break;
2490  }
2491  }
2492 
2493  log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2494  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2495 
2496  if( pDataServer && !pDataServer->IsLocalFile() )
2497  {
2498  //------------------------------------------------------------------------
2499  // Check if we are using a secure connection
2500  //------------------------------------------------------------------------
2501  XrdCl::AnyObject isencobj;
2503  QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2504  if( st.IsOK() )
2505  {
2506  bool *isenc;
2507  isencobj.Get( isenc );
2508  pIsChannelEncrypted = isenc ? *isenc : false;
2509  delete isenc;
2510  }
2511  }
2512 
2513  //--------------------------------------------------------------------------
2514  // We have failed
2515  //--------------------------------------------------------------------------
2516  pStatus = *status;
2517  if( !pStatus.IsOK() || !openInfo )
2518  {
2519  log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2520  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2521  pStatus.ToStr().c_str() );
2522  FailQueuedMessages( pStatus );
2523  pFileState = Error;
2524 
2525  //------------------------------------------------------------------------
2526  // Report to monitoring
2527  //------------------------------------------------------------------------
2529  if( mon )
2530  {
2532  i.file = pFileUrl;
2533  i.status = status;
2535  mon->Event( Monitor::EvErrIO, &i );
2536  }
2537  }
2538  //--------------------------------------------------------------------------
2539  // We have succeeded
2540  //--------------------------------------------------------------------------
2541  else
2542  {
2543  //------------------------------------------------------------------------
2544  // if requested file colocation or dup was done, don't do again on reopen
2545  //------------------------------------------------------------------------
2546  pOpenFlags &= ~(OpenFlags::Dup | OpenFlags::Samefs);
2547 
2548  //------------------------------------------------------------------------
2549  // Store the response info
2550  //------------------------------------------------------------------------
2551  openInfo->GetFileHandle( pFileHandle );
2552  pSessionId = openInfo->GetSessionId();
2553  if( openInfo->GetStatInfo() )
2554  {
2555  delete pStatInfo;
2556  pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2557  }
2558 
2559  log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2560  "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2561  pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2562  (unsigned long long) pSessionId );
2563 
2564  //------------------------------------------------------------------------
2565  // Inform the monitoring about opening success
2566  //------------------------------------------------------------------------
2567  gettimeofday( &pOpenTime, 0 );
2569  if( mon )
2570  {
2572  i.file = pFileUrl;
2573  i.dataServer = pDataServer->GetHostId();
2574  i.oFlags = pOpenFlags;
2575  i.oFlags2 = pOpenFlags>>16;
2576  i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2577  mon->Event( Monitor::EvOpen, &i );
2578  }
2579 
2580  //------------------------------------------------------------------------
2581  // Resend the queued messages if any
2582  //------------------------------------------------------------------------
2583  ReSendQueuedMessages();
2584  pFileState = Opened;
2585  }
2586  }
2587 
2588  //----------------------------------------------------------------------------
2589  // Process the results of the closing operation
2590  //----------------------------------------------------------------------------
2592  {
2593  Log *log = DefaultEnv::GetLog();
2594  XrdSysMutexHelper scopedLock( pMutex );
2595 
2596  log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2597  pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2598  status->ToStr().c_str() );
2599 
2600  log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2601  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2602 
2603  MonitorClose( status );
2604  ResetMonitoringVars();
2605 
2606  pStatus = *status;
2607  pFileState = Closed;
2608  }
2609 
2610  //----------------------------------------------------------------------------
2611  // Handle an error while sending a stateful message
2612  //----------------------------------------------------------------------------
2613  void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2614  XRootDStatus *status,
2615  Message *message,
2616  ResponseHandler *userHandler,
2617  MessageSendParams &sendParams )
2618  {
2619  //--------------------------------------------------------------------------
2620  // It may be a redirection
2621  //--------------------------------------------------------------------------
2622  if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2623  {
2624  static const std::string root = "root", xroot = "xroot", file = "file",
2625  roots = "roots", xroots = "xroots";
2626  std::string msg = status->GetErrorMessage();
2627  if( !msg.compare( 0, root.size(), root ) ||
2628  !msg.compare( 0, xroot.size(), xroot ) ||
2629  !msg.compare( 0, file.size(), file ) ||
2630  !msg.compare( 0, roots.size(), roots ) ||
2631  !msg.compare( 0, xroots.size(), xroots ) )
2632  {
2633  FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2634  return;
2635  }
2636  }
2637 
2638  //--------------------------------------------------------------------------
2639  // Handle error
2640  //--------------------------------------------------------------------------
2641  Log *log = DefaultEnv::GetLog();
2642  XrdSysMutexHelper scopedLock( self->pMutex );
2643  self->pInTheFly.erase( message );
2644 
2645  log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2646  "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2647  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2648 
2649  //--------------------------------------------------------------------------
2650  // Report to monitoring
2651  //--------------------------------------------------------------------------
2653  if( mon )
2654  {
2656  i.file = self->pFileUrl;
2657  i.status = status;
2658 
2659  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2660  switch( req->header.requestid )
2661  {
2662  case kXR_read: i.opCode = Monitor::ErrorInfo::ErrRead; break;
2668  default: i.opCode = Monitor::ErrorInfo::ErrUnc;
2669  }
2670 
2671  mon->Event( Monitor::EvErrIO, &i );
2672  }
2673 
2674  //--------------------------------------------------------------------------
2675  // The message is not recoverable
2676  // (message using a kernel buffer is not recoverable by definition)
2677  //--------------------------------------------------------------------------
2678  if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2679  {
2680  log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2681  "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2682  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2683 
2684  self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2685  delete status;
2686  return;
2687  }
2688 
2689  //--------------------------------------------------------------------------
2690  // Insert the message to the recovery queue and start the recovery
2691  // procedure if we don't have any more message in the fly
2692  //--------------------------------------------------------------------------
2693  self->pCloseReason = *status;
2694  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2695  delete status;
2696  }
2697 
2698  //----------------------------------------------------------------------------
2699  // Handle stateful redirect
2700  //----------------------------------------------------------------------------
2701  void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2702  const std::string &redirectUrl,
2703  Message *message,
2704  ResponseHandler *userHandler,
2705  MessageSendParams &sendParams )
2706  {
2707  XrdSysMutexHelper scopedLock( self->pMutex );
2708  self->pInTheFly.erase( message );
2709 
2710  //--------------------------------------------------------------------------
2711  // Register the state redirect url and append the new cgi information to
2712  // the file URL
2713  //--------------------------------------------------------------------------
2714  if( !self->pStateRedirect )
2715  {
2716  std::ostringstream o;
2717  self->pStateRedirect = new URL( redirectUrl );
2718  URL::ParamsMap params = self->pFileUrl->GetParams();
2719  MessageUtils::MergeCGI( params,
2720  self->pStateRedirect->GetParams(),
2721  false );
2722  self->pFileUrl->SetParams( params );
2723  }
2724 
2725  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2726  }
2727 
2728  //----------------------------------------------------------------------------
2729  // Handle stateful response
2730  //----------------------------------------------------------------------------
2731  void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2732  XRootDStatus *status,
2733  Message *message,
2734  AnyObject *response,
2735  HostList */*urlList*/ )
2736  {
2737  Log *log = DefaultEnv::GetLog();
2738  XrdSysMutexHelper scopedLock( self->pMutex );
2739 
2740  log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2741  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2742  message->GetObfuscatedDescription().c_str() );
2743 
2744  //--------------------------------------------------------------------------
2745  // Since this message may be the last "in-the-fly" and no recovery
2746  // is done if messages are in the fly, we may need to trigger recovery
2747  //--------------------------------------------------------------------------
2748  self->pInTheFly.erase( message );
2749  RunRecovery( self );
2750 
2751  //--------------------------------------------------------------------------
2752  // Play with the actual response before returning it. This is a good
2753  // place to do caching in the future.
2754  //--------------------------------------------------------------------------
2755  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2756  switch( req->header.requestid )
2757  {
2758  //------------------------------------------------------------------------
2759  // Cache the stat response
2760  //------------------------------------------------------------------------
2761  case kXR_stat:
2762  {
2763  StatInfo *info = 0;
2764  response->Get( info );
2765  delete self->pStatInfo;
2766  self->pStatInfo = new StatInfo( *info );
2767  break;
2768  }
2769 
2770  //------------------------------------------------------------------------
2771  // Handle read response
2772  //------------------------------------------------------------------------
2773  case kXR_read:
2774  {
2775  ++self->pRCount;
2776  self->pRBytes += req->read.rlen;
2777  break;
2778  }
2779 
2780  //------------------------------------------------------------------------
2781  // Handle read response
2782  //------------------------------------------------------------------------
2783  case kXR_pgread:
2784  {
2785  ++self->pRCount;
2786  self->pRBytes += req->pgread.rlen;
2787  break;
2788  }
2789 
2790  //------------------------------------------------------------------------
2791  // Handle readv response
2792  //------------------------------------------------------------------------
2793  case kXR_readv:
2794  {
2795  ++self->pVRCount;
2796  size_t segs = req->header.dlen/sizeof(readahead_list);
2797  readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2798  for( size_t i = 0; i < segs; ++i )
2799  self->pVRBytes += dataChunk[i].rlen;
2800  self->pVSegs += segs;
2801  break;
2802  }
2803 
2804  //------------------------------------------------------------------------
2805  // Handle write response
2806  //------------------------------------------------------------------------
2807  case kXR_write:
2808  {
2809  ++self->pWCount;
2810  self->pWBytes += req->write.dlen;
2811  break;
2812  }
2813 
2814  //------------------------------------------------------------------------
2815  // Handle write response
2816  //------------------------------------------------------------------------
2817  case kXR_pgwrite:
2818  {
2819  ++self->pWCount;
2820  self->pWBytes += req->pgwrite.dlen;
2821  break;
2822  }
2823 
2824  //------------------------------------------------------------------------
2825  // Handle writev response
2826  //------------------------------------------------------------------------
2827  case kXR_writev:
2828  {
2829  ++self->pVWCount;
2830  size_t size = req->header.dlen/sizeof(readahead_list);
2831  XrdProto::write_list *wrtList =
2832  reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2833  for( size_t i = 0; i < size; ++i )
2834  self->pVWBytes += wrtList[i].wlen;
2835  break;
2836  }
2837  };
2838  }
2839 
2840  //------------------------------------------------------------------------
2842  //------------------------------------------------------------------------
2843  void FileStateHandler::Tick( time_t now )
2844  {
2845  if (pMutex.CondLock())
2846  {TimeOutRequests( now );
2847  pMutex.UnLock();
2848  }
2849  }
2850 
2851  //----------------------------------------------------------------------------
2852  // Declare timeout on requests being recovered
2853  //----------------------------------------------------------------------------
2855  {
2856  if( !pToBeRecovered.empty() )
2857  {
2858  Log *log = DefaultEnv::GetLog();
2859  log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2860  pFileUrl->GetObfuscatedURL().c_str() );
2861  RequestList::iterator it;
2863  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2864  {
2865  if( it->params.expires <= now )
2866  {
2867  jobMan->QueueJob( new ResponseJob(
2868  it->handler,
2870  0, it->params.hostList ) );
2871  it = pToBeRecovered.erase( it );
2872  }
2873  else
2874  ++it;
2875  }
2876  }
2877  }
2878 
2879  //----------------------------------------------------------------------------
2880  // Called in the child process after the fork
2881  //----------------------------------------------------------------------------
2883  {
2884  Log *log = DefaultEnv::GetLog();
2885 
2886  if( pFileState == Closed || pFileState == Error )
2887  return;
2888 
2889  if( (IsReadOnly() && pDoRecoverRead) ||
2890  (!IsReadOnly() && pDoRecoverWrite) )
2891  {
2892  log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2893  "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2894  pFileState = Recovering;
2895  pInTheFly.clear();
2896  pToBeRecovered.clear();
2897  }
2898  else
2899  pFileState = Error;
2900  }
2901 
2902  //------------------------------------------------------------------------
2903  // Try other data server
2904  //------------------------------------------------------------------------
2905  XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, time_t timeout )
2906  {
2907  XrdSysMutexHelper scopedLock( self->pMutex );
2908 
2909  if( self->pFileState != Opened || !self->pLoadBalancer )
2910  return XRootDStatus( stError, errInvalidOp );
2911 
2912  self->pFileState = Recovering;
2913 
2914  Log *log = DefaultEnv::GetLog();
2915  log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2916  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2917 
2918  // merge CGI
2919  auto lbcgi = self->pLoadBalancer->GetParams();
2920  auto dtcgi = self->pDataServer->GetParams();
2921  MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2922  // update tried CGI
2923  auto itr = lbcgi.find( "tried" );
2924  if( itr == lbcgi.end() )
2925  lbcgi["tried"] = self->pDataServer->GetHostName();
2926  else
2927  {
2928  std::string tried = itr->second;
2929  tried += "," + self->pDataServer->GetHostName();
2930  lbcgi["tried"] = tried;
2931  }
2932  self->pLoadBalancer->SetParams( lbcgi );
2933 
2934  return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2935  }
2936 
2937  //------------------------------------------------------------------------
2938  // Generic implementation of xattr operation
2939  //------------------------------------------------------------------------
2940  template<typename T>
2941  Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2942  kXR_char subcode,
2943  kXR_char options,
2944  const std::vector<T> &attrs,
2945  ResponseHandler *handler,
2946  time_t timeout )
2947  {
2948  //--------------------------------------------------------------------------
2949  // Issue a new fattr request
2950  //--------------------------------------------------------------------------
2951  Message *msg;
2952  ClientFattrRequest *req;
2953  MessageUtils::CreateRequest( msg, req );
2954 
2955  req->requestid = kXR_fattr;
2956  req->subcode = subcode;
2957  req->numattr = attrs.size();
2958  req->options = options;
2959  memcpy( req->fhandle, self->pFileHandle, 4 );
2960  XRootDStatus st = MessageUtils::CreateXAttrBody( msg, attrs );
2961  if( !st.IsOK() ) return st;
2962 
2963  MessageSendParams params;
2964  params.timeout = timeout;
2965  params.followRedirects = false;
2966  params.stateful = true;
2968 
2970  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2971 
2972  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2973  }
2974 
2975  //----------------------------------------------------------------------------
2976  // Send a message to a host or put it in the recovery queue
2977  //----------------------------------------------------------------------------
2978  Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2979  const URL &url,
2980  Message *msg,
2981  ResponseHandler *handler,
2982  MessageSendParams &sendParams )
2983  {
2984  //--------------------------------------------------------------------------
2985  // Recovering
2986  //--------------------------------------------------------------------------
2987  if( self->pFileState == Recovering )
2988  {
2989  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2990  }
2991 
2992  //--------------------------------------------------------------------------
2993  // Trying to send
2994  //--------------------------------------------------------------------------
2995  if( self->pFileState == Opened )
2996  {
2997  msg->SetSessionId( self->pSessionId );
2998  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2999 
3000  //------------------------------------------------------------------------
3001  // Invalid session id means that the connection has been broken while we
3002  // were idle so we haven't been informed about this fact earlier.
3003  //------------------------------------------------------------------------
3004  if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
3005  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
3006 
3007  if( st.IsOK() )
3008  self->pInTheFly.insert(msg);
3009  else
3010  delete handler;
3011  return st;
3012  }
3013  return Status( stError, errInvalidOp );
3014  }
3015 
3016  //----------------------------------------------------------------------------
3017  // Check if the stateful error is recoverable
3018  //----------------------------------------------------------------------------
3019  bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
3020  {
3021  const auto recoverable_errors = {
3025  errInternal,
3026  errTlsError,
3028  };
3029 
3030  if (pDoRecoverRead || pDoRecoverWrite)
3031  for (const auto error : recoverable_errors)
3032  if (status.code == error)
3033  return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
3034 
3035  return false;
3036  }
3037 
3038  //----------------------------------------------------------------------------
3039  // Check if the file is open for read only
3040  //----------------------------------------------------------------------------
3041  bool FileStateHandler::IsReadOnly() const
3042  {
3043  // Keeping the check for append (with a cast) as this was previously tested,
3044  // but OpenFlags::Flags does not currently enumerate the Append flag
3045  if( (pOpenFlags & OpenFlags::Read) && !(pOpenFlags & OpenFlags::Update) &&
3046  !(pOpenFlags & static_cast<OpenFlags::Flags>(kXR_open_apnd)) )
3047  return true;
3048  return false;
3049  }
3050 
3051  //----------------------------------------------------------------------------
3052  // Recover a message
3053  //----------------------------------------------------------------------------
3054  Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
3055  RequestData rd,
3056  bool callbackOnFailure )
3057  {
3058  self->pFileState = Recovering;
3059 
3060  Log *log = DefaultEnv::GetLog();
3061  log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
3062  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3063  rd.request->GetObfuscatedDescription().c_str() );
3064 
3065  Status st = RunRecovery( self );
3066  if( st.IsOK() )
3067  {
3068  self->pToBeRecovered.push_back( rd );
3069  return st;
3070  }
3071 
3072  if( callbackOnFailure )
3073  self->FailMessage( rd, st );
3074 
3075  return st;
3076  }
3077 
3078  //----------------------------------------------------------------------------
3079  // Run the recovery procedure if appropriate
3080  //----------------------------------------------------------------------------
3081  Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
3082  {
3083  if( self->pFileState != Recovering )
3084  return Status();
3085 
3086  if( !self->pInTheFly.empty() )
3087  return Status();
3088 
3089  Log *log = DefaultEnv::GetLog();
3090  log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
3091  self->pFileUrl->GetObfuscatedURL().c_str() );
3092 
3093  Status st;
3094  if( self->pStateRedirect )
3095  {
3096  SendClose( self, 0 );
3097  st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
3098  delete self->pStateRedirect; self->pStateRedirect = 0;
3099  }
3100  else if( self->IsReadOnly() && self->pLoadBalancer )
3101  st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
3102  else
3103  st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
3104 
3105  if( !st.IsOK() )
3106  {
3107  self->pFileState = Error;
3108  self->pStatus = st;
3109  self->FailQueuedMessages( st );
3110  }
3111 
3112  return st;
3113  }
3114 
3115  //----------------------------------------------------------------------------
3116  // Send a close and ignore the response
3117  //----------------------------------------------------------------------------
3118  XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
3119  time_t timeout )
3120  {
3121  Message *msg;
3122  ClientCloseRequest *req;
3123  MessageUtils::CreateRequest( msg, req );
3124 
3125  req->requestid = kXR_close;
3126  memcpy( req->fhandle, self->pFileHandle, 4 );
3127 
3129  msg->SetSessionId( self->pSessionId );
3131  [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
3132  MessageSendParams params;
3133  params.timeout = timeout;
3134  params.followRedirects = false;
3135  params.stateful = true;
3136 
3138 
3139  return self->IssueRequest( *self->pDataServer, msg, handler, params );
3140  }
3141 
3142  //----------------------------------------------------------------------------
3143  // Re-open the current file at a given server
3144  //----------------------------------------------------------------------------
3145  XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3146  const URL &url,
3147  time_t timeout )
3148  {
3149  Log *log = DefaultEnv::GetLog();
3150  log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3151  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3152 
3153  //--------------------------------------------------------------------------
3154  // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3155  // procedure to delete a file that has been partially updated or fail it
3156  // because a partially uploaded file already exists.
3157  //--------------------------------------------------------------------------
3158  if( self->pOpenFlags & OpenFlags::Delete)
3159  {
3160  self->pOpenFlags &= ~OpenFlags::Delete;
3161  self->pOpenFlags |= OpenFlags::Update;
3162  }
3163 
3164  self->pOpenFlags &= ~OpenFlags::New;
3165 
3166  Message *msg;
3167  ClientOpenRequest *req;
3168  URL u = url;
3169 
3170  if( url.GetPath().empty() )
3171  u.SetPath( self->pFileUrl->GetPath() );
3172 
3173  std::string path = u.GetPathWithFilteredParams();
3174  MessageUtils::CreateRequest( msg, req, path.length() );
3175 
3176  req->requestid = kXR_open;
3177  req->mode = self->pOpenMode;
3178  req->options = (self->pOpenFlags & 0xffff);
3179  req->dlen = path.length();
3180  URL sendUrl;
3181  XRootDStatus st = FillFhTempl( self, url, msg, sendUrl );
3182  if( !st.IsOK() )
3183  {
3184  self->pStatus = st;
3185  self->pFileState = Closed;
3186  return st;
3187  }
3188  msg->Append( path.c_str(), path.length(), 24 );
3189 
3190  // create a new reopen handler
3191  // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3192  // until we know that 'SendMessage' was successful)
3193  OpenHandler *openHandler = new OpenHandler( self, 0 );
3194  MessageSendParams params; params.timeout = timeout;
3197 
3198  //--------------------------------------------------------------------------
3199  // Issue the open request
3200  //--------------------------------------------------------------------------
3201  st = self->IssueRequest( sendUrl, msg, openHandler, params );
3202 
3203  // if there was a problem destroy the open handler
3204  if( !st.IsOK() )
3205  {
3206  delete openHandler;
3207  self->pStatus = st;
3208  self->pFileState = Closed;
3209  }
3210  return st;
3211  }
3212 
3213  //------------------------------------------------------------------------
3214  // Fail a message
3215  //------------------------------------------------------------------------
3216  void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3217  {
3218  Log *log = DefaultEnv::GetLog();
3219  log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3220  (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3221  rd.request->GetObfuscatedDescription().c_str(),
3222  status.ToStr().c_str() );
3223 
3224  StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3225  if( !sh )
3226  {
3227  Log *log = DefaultEnv::GetLog();
3228  log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3229  (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3230  rd.request->GetObfuscatedDescription().c_str() );
3231  return;
3232  }
3233 
3235  ResponseHandler *userHandler = sh->GetUserHandler();
3236  jobMan->QueueJob( new ResponseJob(
3237  userHandler,
3238  new XRootDStatus( status ),
3239  0, rd.params.hostList ) );
3240 
3241  delete sh;
3242  }
3243 
3244  //----------------------------------------------------------------------------
3245  // Fail queued messages
3246  //----------------------------------------------------------------------------
3247  void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3248  {
3249  RequestList::iterator it;
3250  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3251  FailMessage( *it, status );
3252  pToBeRecovered.clear();
3253  }
3254 
3255  //------------------------------------------------------------------------
3256  // Re-send queued messages
3257  //------------------------------------------------------------------------
3258  void FileStateHandler::ReSendQueuedMessages()
3259  {
3260  RequestList::iterator it;
3261  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3262  {
3263  it->request->SetSessionId( pSessionId );
3264  ReWriteFileHandle( it->request );
3265  XRootDStatus st = IssueRequest( *pDataServer, it->request,
3266  it->handler, it->params );
3267  if( !st.IsOK() )
3268  FailMessage( *it, st );
3269  }
3270  pToBeRecovered.clear();
3271  }
3272 
3273  //------------------------------------------------------------------------
3274  // Re-write file handle
3275  //------------------------------------------------------------------------
3276  void FileStateHandler::ReWriteFileHandle( Message *msg )
3277  {
3279  switch( hdr->requestid )
3280  {
3281  case kXR_read:
3282  {
3284  memcpy( req->fhandle, pFileHandle, 4 );
3285  break;
3286  }
3287  case kXR_write:
3288  {
3290  memcpy( req->fhandle, pFileHandle, 4 );
3291  break;
3292  }
3293  case kXR_sync:
3294  {
3296  memcpy( req->fhandle, pFileHandle, 4 );
3297  break;
3298  }
3299  case kXR_truncate:
3300  {
3302  memcpy( req->fhandle, pFileHandle, 4 );
3303  break;
3304  }
3305  case kXR_readv:
3306  {
3308  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3309  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3310  memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3311  break;
3312  }
3313  case kXR_writev:
3314  {
3315  ClientWriteVRequest *req =
3316  reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3317  XrdProto::write_list *wrtList =
3318  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3319  size_t size = req->dlen / sizeof(XrdProto::write_list);
3320  for( size_t i = 0; i < size; ++i )
3321  memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3322  break;
3323  }
3324  case kXR_pgread:
3325  {
3327  memcpy( req->fhandle, pFileHandle, 4 );
3328  break;
3329  }
3330  case kXR_pgwrite:
3331  {
3333  memcpy( req->fhandle, pFileHandle, 4 );
3334  break;
3335  }
3336  }
3337 
3338  Log *log = DefaultEnv::GetLog();
3339  log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3340  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3341  *((uint32_t*)pFileHandle) );
3343  }
3344 
3345  //----------------------------------------------------------------------------
3346  // Dispatch monitoring information on close
3347  //----------------------------------------------------------------------------
3348  void FileStateHandler::MonitorClose( const XRootDStatus *status )
3349  {
3351  if( mon )
3352  {
3354  i.file = pFileUrl;
3355  i.oTOD = pOpenTime;
3356  gettimeofday( &i.cTOD, 0 );
3357  i.rBytes = pRBytes;
3358  i.vrBytes = pVRBytes;
3359  i.wBytes = pWBytes;
3360  i.vwBytes = pVWBytes;
3361  i.vSegs = pVSegs;
3362  i.rCount = pRCount;
3363  i.vCount = pVRCount;
3364  i.wCount = pWCount;
3365  i.status = status;
3366  mon->Event( Monitor::EvClose, &i );
3367  }
3368  }
3369 
3370  XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3371  Message *msg,
3372  ResponseHandler *handler,
3373  MessageSendParams &sendParams )
3374  {
3375  // first handle Metalinks
3376  if( pUseVirtRedirector && url.IsMetalink() )
3377  return MessageUtils::RedirectMessage( url, msg, handler,
3378  sendParams, pLFileHandler );
3379 
3380  // than local file access
3381  if( url.IsLocalFile() )
3382  return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3383 
3384  // and finally ordinary XRootD requests
3385  return MessageUtils::SendMessage( url, msg, handler,
3386  sendParams, pLFileHandler );
3387  }
3388 
3389  //------------------------------------------------------------------------
3390  // Send a write request with payload being stored in a kernel buffer
3391  //------------------------------------------------------------------------
3392  XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3393  uint64_t offset,
3394  uint32_t length,
3395  std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3396  ResponseHandler *handler,
3397  time_t timeout )
3398  {
3399  //--------------------------------------------------------------------------
3400  // Create the write request
3401  //--------------------------------------------------------------------------
3402  XrdSysMutexHelper scopedLock( self->pMutex );
3403 
3404  if( self->pFileState != Opened && self->pFileState != Recovering )
3405  return XRootDStatus( stError, errInvalidOp );
3406 
3407  Log *log = DefaultEnv::GetLog();
3408  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3409  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3410  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3411 
3412  Message *msg;
3413  ClientWriteRequest *req;
3414  MessageUtils::CreateRequest( msg, req );
3415 
3416  req->requestid = kXR_write;
3417  req->offset = offset;
3418  req->dlen = length;
3419  memcpy( req->fhandle, self->pFileHandle, 4 );
3420 
3421  MessageSendParams params;
3422  params.timeout = timeout;
3423  params.followRedirects = false;
3424  params.stateful = true;
3425  params.kbuff = kbuff.release();
3426  params.chunkList = new ChunkList();
3427 
3429 
3431  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3432 
3433  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3434  }
3435 
3436  //------------------------------------------------------------------------
3437  // Fills in the file template value and optiont fields that need the
3438  // template (i.e. samefs and dup) in an Open message request
3439  //------------------------------------------------------------------------
3440  XRootDStatus FileStateHandler::FillFhTempl(
3441  std::shared_ptr<FileStateHandler> &self,
3442  const URL &url, Message *msg, URL &sendUrl)
3443  {
3445  sendUrl = url;
3446 
3447  if( !self->NeedFileTempl() )
3448  {
3449  // template file not requireed
3450  return XRootDStatus();
3451  }
3452 
3453  using wp = std::weak_ptr<FileStateHandler>;
3454  if( !self->pTemplateFileWp.owner_before(wp{}) &&
3455  !wp{}.owner_before(self->pTemplateFileWp) )
3456  {
3457  // no tempalte file was set
3458  return XRootDStatus( stError, errInvalidArgs, 0,
3459  "File flags required a template file" );
3460  }
3461 
3462  // all the options that need template
3463  if( self->pOpenFlags & OpenFlags::Dup )
3464  req->optiont |= kXR_dup;
3465  if( self->pOpenFlags & OpenFlags::Samefs )
3466  req->optiont |= kXR_samefs;
3467 
3468  std::shared_ptr<FileStateHandler> tfp = self->pTemplateFileWp.lock();
3469  if(!tfp)
3470  return XRootDStatus( stError, errInvalidArgs, 0,
3471  "Template file object does not exist" );
3472 
3473  XrdSysMutexHelper scopedLock( tfp->pMutex );
3474 
3475  if( tfp->pFileState != Opened )
3476  return XRootDStatus( stError, errInvalidOp, 0,
3477  "Template file not open" );
3478 
3479  if (!tfp->pDataServer || !tfp->pFileHandle)
3480  return XRootDStatus( stError, errInvalidArgs, 0,
3481  "Template file not connected" );
3482 
3483  sendUrl.SetHostPort( tfp->pDataServer->GetHostName(),tfp->pDataServer->GetPort() );
3484  sendUrl.SetUserName( tfp->pDataServer->GetUserName() );
3485  msg->SetSessionId( tfp->pSessionId );
3486  memcpy( req->fhtemplt, tfp->pFileHandle, sizeof(req->fhtemplt) );
3487 
3488  if( !Utils::HasKSameFS( sendUrl ) )
3490 
3491  return XRootDStatus();
3492  }
3493 
3494  //------------------------------------------------------------------------
3495  // Clone file ranges into current file
3496  //------------------------------------------------------------------------
3497  XRootDStatus FileStateHandler::Clone(std::shared_ptr<FileStateHandler> &self,
3498  const CloneLocations &locs,
3499  ResponseHandler *handler,
3500  time_t timeout )
3501  {
3502  XrdSysMutexHelper scopedLock( self->pMutex );
3503 
3504  if( self->pFileState == Error ) return self->pStatus;
3505 
3506  if( self->pFileState != Opened && self->pFileState != Recovering )
3507  return XRootDStatus( stError, errInvalidOp );
3508 
3509  if( !Utils::HasKSameFS( *self->pDataServer ) )
3511 
3512  Log *log = DefaultEnv::GetLog();
3513  log->Debug( FileMsg, "[%p@%s] Sending a clone command for handle %#x to %s",
3514  self.get(), self->pFileUrl->GetURL().c_str(),
3515  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3516 
3517  Message *msg;
3518  ClientReadRequest *req;
3519 
3520  size_t nrange = locs.locations.size();
3521 
3522  MessageUtils::CreateRequest( msg, req, sizeof(XrdProto::clone_list)*nrange );
3523 
3524  req->requestid = kXR_clone;
3525  req->dlen = sizeof(XrdProto::clone_list)*nrange;
3526  memcpy( req->fhandle, self->pFileHandle, 4 );
3527 
3529  int idx=0;
3530  for(auto &loc: locs.locations)
3531  {
3532  if( !loc.file )
3533  return XRootDStatus( stError, errInvalidOp, 0,
3534  "Template file not available" );
3535 
3536  FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>(loc.file.get());
3537  if( !fht )
3538  return XRootDStatus( stError, errInvalidOp, 0,
3539  "Template file invalid" );
3540 
3541  std::shared_ptr<FileStateHandler> tfp = fht->pTemplateFileWp.lock();
3542  if( !tfp )
3543  return XRootDStatus( stError, errInvalidOp, 0,
3544  "Template file object does not exist" );
3545 
3546  XrdSysMutexHelper scopedLock( tfp->pMutex );
3547  if( tfp->pFileState != Opened )
3548  return XRootDStatus( stError, errInvalidOp, 0,
3549  "Template file not open" );
3550 
3551  if( tfp->pSessionId != self->pSessionId )
3552  return XRootDStatus( stError, errInvalidOp, 0,
3553  "Clone source not at same location as destination" );
3554 
3555  memcpy( cl[idx].srcFH, tfp->pFileHandle, 4 );
3556  cl[idx].srcOffs = loc.srcOffs;
3557  cl[idx].srcLen = loc.srcLen;
3558  cl[idx].dstOffs = loc.dstOffs;
3559  ++idx;
3560  }
3561 
3563  MessageSendParams params;
3564  params.timeout = timeout;
3565  params.followRedirects = false;
3566  params.stateful = true;
3568  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3569 
3570  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3571  }
3572 }
kXR_unt16 requestid
Definition: XProtocol.hh:511
kXR_unt16 requestid
Definition: XProtocol.hh:666
kXR_unt16 requestid
Definition: XProtocol.hh:847
@ kXR_fattrDel
Definition: XProtocol.hh:300
@ kXR_fattrSet
Definition: XProtocol.hh:303
@ kXR_fattrList
Definition: XProtocol.hh:302
@ kXR_fattrGet
Definition: XProtocol.hh:301
#define kXR_suppgrw
Definition: XProtocol.hh:1216
kXR_char fhandle[4]
Definition: XProtocol.hh:565
kXR_char fhandle[4]
Definition: XProtocol.hh:823
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:903
kXR_char fhandle[4]
Definition: XProtocol.hh:848
kXR_char fhandle[4]
Definition: XProtocol.hh:812
kXR_int64 offset
Definition: XProtocol.hh:682
kXR_unt16 requestid
Definition: XProtocol.hh:680
@ kXR_virtReadv
Definition: XProtocol.hh:152
kXR_char fhtemplt[4]
Definition: XProtocol.hh:516
kXR_unt16 options
Definition: XProtocol.hh:513
static const int kXR_ckpXeq
Definition: XProtocol.hh:218
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:904
kXR_unt16 requestid
Definition: XProtocol.hh:257
@ kXR_async
Definition: XProtocol.hh:488
@ kXR_open_apnd
Definition: XProtocol.hh:492
@ kXR_retstat
Definition: XProtocol.hh:493
struct ClientRequestHdr header
Definition: XProtocol.hh:887
kXR_char fhandle[4]
Definition: XProtocol.hh:543
#define kXR_recoverWrts
Definition: XProtocol.hh:1208
kXR_unt16 optiont
Definition: XProtocol.hh:514
kXR_unt16 infotype
Definition: XProtocol.hh:667
kXR_char fhandle[4]
Definition: XProtocol.hh:681
kXR_char fhandle[4]
Definition: XProtocol.hh:258
kXR_unt16 requestid
Definition: XProtocol.hh:159
kXR_char fhandle[4]
Definition: XProtocol.hh:669
@ kXR_read
Definition: XProtocol.hh:126
@ kXR_open
Definition: XProtocol.hh:123
@ kXR_writev
Definition: XProtocol.hh:144
@ kXR_clone
Definition: XProtocol.hh:145
@ kXR_readv
Definition: XProtocol.hh:138
@ kXR_sync
Definition: XProtocol.hh:129
@ kXR_fattr
Definition: XProtocol.hh:133
@ kXR_query
Definition: XProtocol.hh:114
@ kXR_write
Definition: XProtocol.hh:132
@ kXR_truncate
Definition: XProtocol.hh:141
@ kXR_stat
Definition: XProtocol.hh:130
@ kXR_pgread
Definition: XProtocol.hh:143
@ kXR_chkpoint
Definition: XProtocol.hh:125
@ kXR_close
Definition: XProtocol.hh:116
@ kXR_pgwrite
Definition: XProtocol.hh:139
kXR_int32 dlen
Definition: XProtocol.hh:684
struct ClientReadRequest read
Definition: XProtocol.hh:909
kXR_int32 rlen
Definition: XProtocol.hh:696
kXR_unt16 requestid
Definition: XProtocol.hh:808
kXR_int32 dlen
Definition: XProtocol.hh:517
kXR_char fhandle[4]
Definition: XProtocol.hh:835
kXR_unt16 mode
Definition: XProtocol.hh:512
kXR_unt16 requestid
Definition: XProtocol.hh:542
kXR_unt16 requestid
Definition: XProtocol.hh:822
kXR_char fhandle[4]
Definition: XProtocol.hh:206
kXR_int64 offset
Definition: XProtocol.hh:697
#define kXR_PROTPGRWVERSION
Definition: XProtocol.hh:73
kXR_int64 offset
Definition: XProtocol.hh:849
@ kXR_dup
Definition: XProtocol.hh:503
@ kXR_samefs
Definition: XProtocol.hh:504
struct ClientWriteRequest write
Definition: XProtocol.hh:918
kXR_int32 rlen
Definition: XProtocol.hh:683
kXR_unt16 requestid
Definition: XProtocol.hh:706
@ kXR_Qvisa
Definition: XProtocol.hh:656
kXR_int32 dlen
Definition: XProtocol.hh:161
unsigned char kXR_char
Definition: XPtypes.hh:65
static int mapError(int rc)
Definition: XProtocol.hh:1404
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
Definition: XrdClBuffer.hh:164
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, time_t timeout)
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
std::weak_ptr< FileStateHandler > pTemplateFileWp
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, time_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, time_t timeout)
Try other data server.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Clone(std::shared_ptr< FileStateHandler > &self, const CloneLocations &locs, ResponseHandler *handler, time_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, time_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, time_t timeout=0)
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus OpenUsingTemplate(std::shared_ptr< FileStateHandler > &self, ExportedFileTemplate *templ, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PreRead(std::shared_ptr< FileStateHandler > &self, const TractList &tracts, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, time_t timeout=0)
bool IsOpen() const
Check if the file is open.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Open operation (.
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetHostPort(const std::string &hostName, int port)
Definition: XrdClURL.hh:206
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:331
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:225
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:143
static bool HasKSameFS(const XrdCl::URL &url)
Check if given server supports kXR_clone and kXR_samefs.
Definition: XrdClUtils.hh:267
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
Definition: XrdClUtils.hh:235
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInProgress
Definition: XrdClStatus.hh:59
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint64_t FileMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
std::vector< TractInfo > TractList
List of Tracts.
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errSocketDisconnected
Definition: XrdClStatus.hh:74
none object for initializing empty Optional
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition: XProtocol.hh:328
kXR_char fhandle[4]
Definition: XProtocol.hh:318
kXR_unt16 requestid
Definition: XProtocol.hh:317
kXR_unt16 requestid
Definition: XProtocol.hh:861
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< CloneLocation > locations
Definition: XrdClFile.hh:1053
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
uint16_t oFlags2
OpenFlags upper 16 bits.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
Open flags, may be or'd when appropriate.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Samefs
Open file on the same filesystem as another.
@ Update
Open for reading and writing.
@ Dup
Open file duplicating content from another.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
Code
XRootD query request codes.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted