XRootD
Loading...
Searching...
No Matches
XrdFrmTransfer.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r m T r a n s f e r . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstring>
32#include <strings.h>
33#include <cstdio>
34#include <fcntl.h>
35#include <unistd.h>
36#include <utime.h>
37#include <sys/param.h>
38#include <sys/types.h>
39#include <sys/stat.h>
40
41#include "XrdFrc/XrdFrcCID.hh"
43#include "XrdFrc/XrdFrcTrace.hh"
44#include "XrdFrc/XrdFrcXAttr.hh"
45#include "XrdFrm/XrdFrmCns.hh"
52#include "XrdOss/XrdOss.hh"
53#include "XrdOuc/XrdOucEnv.hh"
54#include "XrdOuc/XrdOucMsubs.hh"
55#include "XrdOuc/XrdOucProg.hh"
56#include "XrdOuc/XrdOucSxeq.hh"
57#include "XrdOuc/XrdOucUtils.hh"
58#include "XrdOuc/XrdOucXAttr.hh"
59#include "XrdSys/XrdSysError.hh"
60#include "XrdSys/XrdSysFD.hh"
62
63using namespace XrdFrc;
64using namespace XrdFrm;
65
66/******************************************************************************/
67/* L o c a l C l a s s e s */
68/******************************************************************************/
69
71{
75char *theSrc;
76char *theDst;
77char *theINS;
78char theMDP[8];
79
81 : theEnv(Env), theCmd(0), theVec(0), theSrc(0),
82 theDst(0), theINS(0)
83 {theMDP[0] = '0'; theMDP[1] = 0;}
85};
86
88{ struct stat *Stat;
89 int lkfd;
90 int lkfx;
91
92 XrdFrmTranChk(struct stat *sP) : Stat(sP), lkfd(-1), lkfx(0) {}
93 ~XrdFrmTranChk() {if (lkfd >= 0) close(lkfd);}
94};
95
96/******************************************************************************/
97/* S t a t i c s */
98/******************************************************************************/
99
100XrdSysMutex XrdFrmTransfer::pMutex;
101XrdOucHash<char> XrdFrmTransfer::pTab;
102
103/******************************************************************************/
104/* C o n s t r u c t o r */
105/******************************************************************************/
106
108{
109 int i;
110
111// Construct program objects
112//
113 for (i = 0; i < 4; i++)
114 xfrCmd[i] = (Config.xfrCmd[i].theVec ? new XrdOucProg(&Say) : 0);
115}
116
117/******************************************************************************/
118/* Public: c h e c k F F */
119/******************************************************************************/
120
121const char *XrdFrmTransfer::checkFF(const char *Path)
122{
123 EPNAME("checkFF");
124 struct stat buf;
125
126// Check for a fail file
127//
128 if (!stat(Path, &buf))
129 {if (buf.st_ctime+Config.FailHold >= time(0))
130 return "request previously failed";
131 if (Config.Test) {DEBUG("would have removed '" <<Path <<"'");}
132 else {unlink(Path);
133 DEBUG("removed '" <<Path <<"'");
134 }
135 }
136
137// Return all is well
138//
139 return 0;
140}
141
142/******************************************************************************/
143/* F e t c h */
144/******************************************************************************/
145
146const char *XrdFrmTransfer::Fetch()
147{
148 EPNAME("Fetch");
149 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
150 static const int crOpts = (O_CREAT|O_TRUNC)<<8|XRDOSS_mkpath;
151
152 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
153 XrdFrmTranArg cmdArg(&myEnv);
154 struct stat pfnStat;
155 time_t xfrET;
156 const char *eTxt, *retMsg = 0;
157 char lfnpath[MAXPATHLEN+1024+512+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
158 char pdBuff[1024];
159 int iXfr, pdSZ, lfnEnd, rc, isURL = 0, doRM = 0;
160 long long fSize = 0;
161
162// The remote source is either the url-lfn or a translated lfn
163//
164 if ((isURL = xfrP->reqData.LFO)) theSrc = xfrP->reqData.LFN;
165 else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
166 return "lfn2rfn failed";
167 theSrc = Rfn;
168 isURL = (*Rfn != '/');
169 }
170
171// Check if we can actually handle this transfer
172//
173 if (isURL)
174 {if (xfrCmd[2]) iXfr = 2;
175 else return "url copies not configured";
176 } else {
177 if (xfrCmd[0]) iXfr = 0;
178 else return "non-url copies not configured";
179 }
180
181// Check for a fail file
182//
183 if ((eTxt = ffCheck())) return eTxt;
184
185// Check if the file exists
186//
187 Lfn = (xfrP->reqData.LFN)+xfrP->reqData.LFO;
188 if (!Config.Stat(Lfn, xfrP->PFN, &pfnStat))
189 {DEBUG(xfrP->PFN <<" exists; not fetched.");
190 return 0;
191 }
192
193// Construct the file name to which to we originally transfer the data. This is
194// the lfn if we do not pre-allocate files and "lfn.anew" otherwise.
195//
196 lfnEnd = strlen(Lfn);
197 strlcpy(lfnpath, Lfn, sizeof(lfnpath)-8);
198 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
199 {strcpy(&lfnpath[lfnEnd], ".anew");
200 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".anew");
201 }
202
203// Setup the command
204//
205 cmdArg.theCmd = xfrCmd[iXfr];
206 cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
207 cmdArg.theSrc = theSrc;
208 cmdArg.theDst = xfrP->PFN;
209 cmdArg.theINS = xfrP->reqData.iName;
210 if (!SetupCmd(&cmdArg)) return "incoming transfer setup failed";
211
212// If the copycmd needs a placeholder in the filesystem for this transfer, we
213// must create one. We first remove any existing "anew" file because we will
214// over-write it. The create process will create a lock file if need be.
215// However, we can ignore it as we are the only ones actually using it.
216//
217 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
218 {Config.ossFS->Unlink(lfnpath);
219 rc = Config.ossFS->Create(xfrP->reqData.User,lfnpath,fMode,myEnv,crOpts);
220 if (rc)
221 {Say.Emsg("Fetch", rc, "create placeholder for", lfnpath);
222 return "create failed";
223 }
224 doRM = 1;
225 } else doRM = Config.xfrCmd[iXfr].Opts & Config.cmdRME;
226
227// Setup program monitoring data
228//
229 pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
230
231// Now run the command to get the file and make sure the file is there
232// If it is, make sure that if a lock file exists its date/time is greater than
233// the file we just fetched; then rename it to be the correct name.
234//
235 xfrET = time(0);
236 if (!(rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
237 {if ((rc = Config.Stat(lfnpath, xfrP->PFN, &pfnStat)))
238 {Say.Emsg("Fetch", lfnpath, "fetched but not resident!"); fSize = 0;}
239 else {fSize = pfnStat.st_size;
240 if (Config.xfrCmd[iXfr].Opts & Config.cmdAlloc)
241 FetchDone(lfnpath, pfnStat, rc);
242 }
243 }
244
245// Clean up if we failed otherwise tell the cmsd that we have a new file. Upon
246// failure we issue a a remove as we don't want the temp file to exist.
247//
248 xfrP->PFN[xfrP->pfnEnd] = '\0';
249 if (rc)
250 {if (doRM) Config.ossFS->Unlink(lfnpath);
251 ffMake(rc == -2);
252 if (rc == -2) {xfrP->RetCode = 2; retMsg = "file not found";}
253 else retMsg = "fetch failed";
254 } else if (Config.cmsPath) Config.cmsPath->Have(Lfn);
255
256// We completed, see if we need to do statistics
257//
259 || (Trace.What & TRACE_Debug))
260 {time_t eNow = time(0);
261 int inqT, xfrT;
262 inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
263 if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
264 if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
265 || (Trace.What & TRACE_Debug)) && !retMsg)
266 {char sbuff[80];
267 sprintf(sbuff, "Got: %lld qt: %d xt: %d up: ", fSize, inqT, xfrT);
268 lfnpath[lfnEnd] = '\0';
269 Say.Say(0, sbuff, xfrP->reqData.User, " ", lfnpath);
270 }
272 {if (rc < 0) rc = -rc;
273 snprintf(lfnpath+lfnEnd, sizeof(lfnpath)-lfnEnd-1,
274 "\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
275 static_cast<long long>(eNow), fSize, inqT, xfrT,
276 xfrP->Act, rc, (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
278 }
279 }
280
281// All done
282//
283 return retMsg;
284}
285
286/******************************************************************************/
287/* F e t c h D o n e */
288/******************************************************************************/
289
290const char *XrdFrmTransfer::FetchDone(char *lfnpath, struct stat &Stat, int &rc)
291{
292
293// If we are running in new mode, update file attributes
294//
295 rc = 0;
296 if (Config.runNew && Config.NeedsCTA(lfnpath))
298 cpyInfo.Attr.cpyTime = static_cast<long long>(Stat.st_mtime);
299 if ((rc = cpyInfo.Set(xfrP->PFN)))
300 Say.Emsg("Fetch", rc, "set copy time xattr on", xfrP->PFN);
301 }
302
303// Check for a lock file and if we have one, reset it's time or delete it
304//
305 if (Config.runOld && Config.NeedsCTA(lfnpath))
306 {struct stat lkfStat;
307 strcpy(&xfrP->PFN[xfrP->pfnEnd+5], ".lock");
308 if (!stat(xfrP->PFN, &lkfStat))
309 {if (Config.runNew && !rc) unlink(xfrP->PFN);
310 else {struct utimbuf tbuff;
311 tbuff.actime = tbuff.modtime = Stat.st_mtime;
312 if ((rc = utime(xfrP->PFN, &tbuff)))
313 Say.Emsg("Fetch", rc, "set utime on", xfrP->PFN);
314 }
315 }
316 }
317
318// Now rename the lfn to be what it needs to be in the end
319//
320 if (!rc && (rc=Config.ossFS->Rename(lfnpath,xfrP->reqData.LFN)))
321 Say.Emsg("Fetch", rc, "rename", lfnpath);
322 else XrdFrmCns::Add(xfrP->reqData.User, xfrP->reqData.LFN,
323 Stat.st_size, Stat.st_mode);
324
325// Done
326//
327 return (rc ? "Failed" : 0);
328}
329
330/******************************************************************************/
331/* Private: f f C h e c k */
332/******************************************************************************/
333
334const char *XrdFrmTransfer::ffCheck()
335{
336 const char *eTxt;
337
338// Generate proper fail file path and check if it exists
339//
340 if (Config.xfrFdir)
341 {char ffPath[MAXPATHLEN+8];
342 if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return 0;
343 strcpy(ffPath, Config.xfrFdir);
344 strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
345 strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
346 eTxt = checkFF(ffPath);
347 } else {
348 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
349 eTxt = checkFF(xfrP->PFN);
350 xfrP->PFN[xfrP->pfnEnd] = '\0';
351 }
352
353// Determine result
354//
355 if (eTxt) xfrP->RetCode = 1;
356 return eTxt;
357}
358
359/******************************************************************************/
360/* Private: f f M a k e */
361/******************************************************************************/
362
363void XrdFrmTransfer::ffMake(int nofile){
364 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
365 static const mode_t dMode = S_IXUSR|S_IWGRP|S_IXGRP|S_IXOTH | fMode;
366 char ffPath[MAXPATHLEN+8], *ffP;
367 int myFD;
368
369// Generate fail file path
370//
371 if (Config.xfrFdir)
372 {if (Config.xfrFdln+xfrP->pfnEnd+5 >= int(sizeof(ffPath))) return;
373 strcpy(ffPath, Config.xfrFdir);
374 strcpy(ffPath+Config.xfrFdln, xfrP->PFN);
375 strcpy(ffPath+Config.xfrFdln+xfrP->pfnEnd, ".fail");
376 XrdOucUtils::makePath(ffPath, dMode);
377 ffP = ffPath;
378 } else {
379 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".fail");
380 ffP = xfrP->PFN;
381 }
382
383// Create a fail file and if failure is due to "file not found" set the mtime
384// to 2 so that the oss layer picks up the same error in the future.
385//
386 myFD = open(ffP, O_CREAT, fMode);
387 if (myFD >= 0)
388 {close(myFD);
389 if (nofile)
390 {struct utimbuf tbuff;
391 tbuff.actime = time(0); tbuff.modtime = 2;
392 utime(ffP, &tbuff);
393 }
394 }
395 if (!Config.xfrFdir) xfrP->PFN[xfrP->pfnEnd] = '\0';
396}
397
398/******************************************************************************/
399/* I n i t */
400/******************************************************************************/
401
402void *InitXfer(void *parg)
404 if (parg) xP->Start(*(int *)parg);
405 return (void *)0;
406}
407
409{
410 static int anyQ = XrdFrmXfrQueue::useAnyQ;
411 static int inpQ = XrdFrmXfrQueue::useInpQ;
412 static int outQ = XrdFrmXfrQueue::useOutQ;
413 void *qWant;
414 pthread_t tid;
415 int retc, n;
416
417// Initialize the cluster identification object first
418//
420
421// Initialize the transfer queue first
422//
423 if (!XrdFrmXfrQueue::Init()) return 0;
424
425// Start the required number of transfer threads. Note we can split these
426// as dedicated in threads and dedicated out threads.
427//
428 n = Config.xfrMax;
429 while(n--)
430 { if (Config.xfrMaxIn)
431 { qWant = (void *)&inpQ; Config.xfrMaxIn--;}
432 else if (Config.xfrMaxOt)
433 { qWant = (void *)&outQ; Config.xfrMaxOt--;}
434 else qWant = (void *)&anyQ;
435
436 if ((retc = XrdSysThread::Run(&tid, InitXfer, qWant,
437 XRDSYSTHREAD_BIND, "transfer")))
438 {Say.Emsg("main", retc, "create xfr thread"); return 0;}
439 }
440
441// All done
442//
443 return 1;
444}
445
446/******************************************************************************/
447/* Private: S e t u p C m d */
448/******************************************************************************/
449
450int XrdFrmTransfer::SetupCmd(XrdFrmTranArg *argP)
451{
452 char *pdata[XrdOucMsubs::maxElem + 2], *cP;
453 int pdlen[XrdOucMsubs::maxElem + 2], i, k, n;
454
456 Info(xfrP->reqData.User, argP->theEnv, Config.the_N2N,
457 xfrP->reqData.LFN+xfrP->reqData.LFO,
458 argP->theSrc, xfrP->reqData.Prty,
459 xfrP->reqData.Options & XrdFrcRequest::makeRW?O_RDWR:O_RDONLY,
460 argP->theMDP, xfrP->reqData.ID, xfrP->PFN, argP->theDst);
461
462// We must establish the host, cluster and instance name if we have one
463//
464 if (argP->theEnv)
465 {argP->theEnv->Put(SEC_HOST, Config.myName);
466 if (argP->theINS)
467 {CID.Get(argP->theINS, CMS_CID, argP->theEnv);
468 argP->theEnv->Put(XRD_INS, argP->theINS);
469 }
470 }
471
472// Substitute in the parameters
473//
474 k = argP->theVec->Subs(Info, pdata, pdlen);
475
476// Catenate all of the arguments
477//
478 *cmdBuff = '\0'; n = sizeof(cmdBuff) - 4; cP = cmdBuff;
479 for (i = 0; i < k; i++)
480 {n -= pdlen[i];
481 if (n < 0)
482 {Say.Emsg("Setup",E2BIG,"build command line for", xfrP->reqData.LFN);
483 return 0;
484 }
485 strcpy(cP, pdata[i]); cP += pdlen[i];
486 }
487
488// Now setup the command
489//
490 return (argP->theCmd->Setup(cmdBuff, &Say) == 0);
491}
492
493/******************************************************************************/
494/* Public: S t a r t */
495/******************************************************************************/
496
497void XrdFrmTransfer::Start(int ioqType)
498{
499 EPNAME("Transfer"); // Wrong but looks better
500 const char *Msg;
501
502// Prime I/O queue selection
503
504// Endless loop looking for transfer jobs
505//
506 while(1)
507 {xfrP = XrdFrmXfrQueue::Get(ioqType);
508
509 DEBUG(xfrP->Type <<" starting " <<xfrP->reqData.LFN
510 <<" for " <<xfrP->reqData.User);
511
512 Msg = (xfrP->qNum & XrdFrcRequest::outQ ? Throw() : Fetch());
513 if (Msg && !(xfrP->RetCode)) xfrP->RetCode = 1;
514 xfrP->PFN[xfrP->pfnEnd] = 0;
515
516 if (xfrP->RetCode || Config.Verbose)
517 {char buff1[280], buff2[80];
518 sprintf(buff1, "%s for %s", xfrP->RetCode ? "failed" : "complete",
519 xfrP->reqData.User);
520 if (xfrP->RetCode == 0) *buff2 = 0;
521 else sprintf(buff2, "; %s", (Msg ? Msg : "reason unknown"));
522 Say.Say(0, xfrP->Type, buff1, xfrP->reqData.LFN,buff2);
523 } else {
524 DEBUG(xfrP->Type
525 <<(xfrP->RetCode ? " failed " : " complete ")
526 << xfrP->reqData.LFN <<" rc=" <<xfrP->RetCode
527 <<' ' <<(Msg ? Msg : ""));
528 }
529
530 XrdFrmXfrQueue::Done(xfrP, Msg);
531 }
532}
533
534/******************************************************************************/
535/* Private: T r a c k D C */
536/******************************************************************************/
537
538int XrdFrmTransfer::TrackDC(char *Lfn, char *Mdp, char *Rfn)
539{
540 (void)Lfn;
541 char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
542 int n = -1;
543
544// If this is a url, then don't back space into the url part
545//
546 if (*Rfn != '/'
547 && (Slash = index(Rfn, '/')) && *(Slash+1) == '/'
548 && (Slash = index(Slash+2, '/')) && *(Slash+1) == '/') begRfn = Slash+1;
549
550// Discard the filename component
551//
552 if (!(FName = rindex(begRfn, '/')) || FName == begRfn) return 0;
553 *FName = 0; Slash = Slush = FName;
554
555// Try to find the created directory path
556//
557 pMutex.Lock();
558 while(Slash != begRfn && !pTab.Find(Rfn))
559 {do {Slash--;} while(Slash != begRfn && *Slash != '/');
560 if (Slush) *Slush = '/';
561 *Slash = 0; Slush = Slash;
562 n++;
563 }
564 pMutex.UnLock();
565
566// Compute offset of uncreated part
567//
568 *Slash = '/';
569 if (Slash == begRfn) n = 0;
570 else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
571 sprintf(Mdp, "%d", n);
572
573// All done
574//
575 return n;
576}
577
578/******************************************************************************/
579
580int XrdFrmTransfer::TrackDC(char *Rfn)
581{
582 char *Slash;
583
584// Trim off the trailing end
585//
586 if (!(Slash = rindex(Rfn, '/')) || Slash == Rfn) return 0;
587 *Slash = 0;
588
589// The path has been added, do insert it into the table of created paths
590//
591 pMutex.Lock();
592 pTab.Add(Rfn, 0, 0, Hash_data_is_key);
593 pMutex.UnLock();
594 *Slash = '/';
595 return 0;
596}
597
598/******************************************************************************/
599/* T h r o w */
600/******************************************************************************/
601
602const char *XrdFrmTransfer::Throw()
603{
604 XrdOucEnv myEnv(xfrP->reqData.Opaque?xfrP->reqData.LFN+xfrP->reqData.Opaque:0);
605 XrdFrmTranArg cmdArg(&myEnv);
606 struct stat begStat, endStat;
607 XrdFrmTranChk Chk(&begStat);
608 time_t xfrET;
609 const char *eTxt, *retMsg = 0;
610 char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->reqData.LFN, *theDest;
611 char pdBuff[1024];
612 int isMigr = xfrP->reqData.Options & XrdFrcRequest::Migrate;
613 int iXfr, isURL, pdSZ, rc, mDP = -1;
614
615// The remote source is either the url-lfn or a translated lfn
616//
617 if ((isURL = xfrP->reqData.LFO)) theDest = xfrP->reqData.LFN;
618 else {if (!Config.RemotePath(xfrP->reqData.LFN, Rfn, sizeof(Rfn)))
619 return "lfn2rfn failed";
620 theDest = Rfn;
621 isURL = (*Rfn != '/');
622 }
623
624// Check if we can actually handle this transfer
625//
626 if (isURL)
627 {if (xfrCmd[3]) iXfr = 3;
628 else return "url copies not configured";
629 } else {
630 if (xfrCmd[1]) iXfr = 1;
631 else return "non-url copies not configured";
632 }
633
634// Check if the file exists (we only copy resident files)
635//
636 if (Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &begStat))
637 return (xfrP->reqFQ ? "file not found" : 0);
638
639// Check for a fail file
640//
641 if ((eTxt = ffCheck())) return eTxt;
642
643// If this is an mss migration request, then recheck if the file can and
644// need to be migrated based on the lock file. This also obtains a directory
645// lock and lock file lock, as needed. If the file need not be migrated but
646// should be purge, we will get a null string error.
647//
648 if (isMigr && (eTxt = ThrowOK(&Chk)))
649 {if (*eTxt) return eTxt;
650 if (!(xfrP->reqData.Options & XrdFrcRequest::Purge)) return "logic error";
651 Throwaway();
652 return 0;
653 }
654
655// Setup the command, including directory tracking, as needed
656//
657 cmdArg.theCmd = xfrCmd[iXfr];
658 cmdArg.theVec = Config.xfrCmd[iXfr].theVec;
659 cmdArg.theDst = theDest;
660 cmdArg.theSrc = xfrP->PFN;
661 cmdArg.theINS = xfrP->reqData.iName;
662 if (Config.xfrCmd[iXfr].Opts & Config.cmdMDP)
663 mDP = TrackDC(lfnpath+xfrP->reqData.LFO, cmdArg.theMDP, Rfn);
664 if (!SetupCmd(&cmdArg)) return "outgoing transfer setup failed";
665
666// Setup program monitoring data
667//
668 pdSZ = (Config.xfrCmd[iXfr].Opts & Config.cmdXPD ? sizeof(pdBuff) : 0);
669
670// Now run the command to put the file. If the command fails and this is a
671// migration request, cretae a fail file if one does not exist.
672//
673 xfrET = time(0);
674 if ((rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
675 {if (isMigr) ffMake(rc == -2);
676 retMsg = "copy failed";
677 }
678
679// Track directory creations if we need to track them
680//
681 if (!rc && mDP >= 0) TrackDC(Rfn);
682
683// Obtain state of the file after the copy and make sure the file was not
684// modified during the copy. This is an error for queued requests but
685// internally generated requests will simply be retried.
686//
687 if (!rc)
688 {if ((rc = Config.Stat(lfnpath+xfrP->reqData.LFO, xfrP->PFN, &endStat)))
689 {Say.Emsg("Throw", lfnpath, "transferred but not found!");
690 retMsg = "unable to verify copy";
691 } else {
692 if (begStat.st_mtime != endStat.st_mtime
693 || begStat.st_size != endStat.st_size)
694 {Say.Emsg("Throw", lfnpath, "modified during transfer!");
695 retMsg = "file modified during copy"; rc = 1;
696 }
697 }
698 }
699
700// Purge the file if so wanted. Otherwise, if this is a migration request,
701// make sure that if a lock file exists its date/time is equal to the file
702// we just copied to prevent the file from being copied again (we have a lock).
703//
704 if (!rc)
705 {if (xfrP->reqData.Options & XrdFrcRequest::Purge) Throwaway();
706 else if (isMigr) ThrowDone(&Chk, endStat.st_mtime);
707 }
708
709// Do statistics if so wanted
710//
712 || (Trace.What & TRACE_Debug))
713 {time_t eNow = time(0);
714 int inqT, xfrT;
715 long long Fsize = begStat.st_size;
716 inqT = static_cast<int>(xfrET - time_t(xfrP->reqData.addTOD));
717 if ((xfrT = static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
718 if (((Config.xfrCmd[iXfr].Opts & Config.cmdStats)
719 || (Trace.What & TRACE_Debug)) && !rc)
720 {char sbuff[80];
721 sprintf(sbuff, "Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
722 Say.Say(0, sbuff, xfrP->reqData.User, " ", xfrP->reqData.LFN);
723 }
725 {char monBuff[MAXPATHLEN+1024+512+8];
726 if (rc < 0) rc = -rc;
727 snprintf(monBuff, sizeof(monBuff),
728 "%s\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
729 xfrP->reqData.LFN, static_cast<long long>(eNow), Fsize,
730 inqT, xfrT, xfrP->Act, rc,
731 (pdSZ ? "&pd=" : ""), (pdSZ ? pdBuff : ""));
733 }
734 }
735
736// All done
737//
738 return retMsg;
739}
740
741/******************************************************************************/
742/* Private: T h r o w a w a y */
743/******************************************************************************/
744
745void XrdFrmTransfer::Throwaway()
746{
747 EPNAME("Throwaway");
748
749// Purge the file. We do this via the pfn but also indicate we want all
750// migration support suffixes removed it they exist. Notify the cmsd & cnsd.
751//
752 if (Config.Test) {DEBUG("Would have removed '" <<xfrP->PFN <<"'");}
754 DEBUG("removed '" <<xfrP->PFN <<"'");
755 if (Config.cmsPath) Config.cmsPath->Gone(xfrP->PFN);
756 XrdFrmCns::Rm(xfrP->PFN);
757 }
758}
759
760/******************************************************************************/
761/* Private: T h r o w D o n e */
762/******************************************************************************/
763
764void XrdFrmTransfer::ThrowDone(XrdFrmTranChk *cP, time_t endTime)
765{
766
767// Update file attributes if we are running in new mode, otherwise do
768//
769 if (Config.runNew)
771 cpyInfo.Attr.cpyTime = static_cast<long long>(endTime);
772 if (cpyInfo.Set(xfrP->PFN, cP->lkfd))
773 Say.Emsg("Throw", "Unable to set copy time xattr for", xfrP->PFN);
774 else if (cP->lkfx)
775 {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
776 unlink(xfrP->PFN);
777 xfrP->PFN[xfrP->pfnEnd] = '\0';
778 }
779 } else {
780 struct stat Stat;
781 strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
782 if (!stat(xfrP->PFN, &Stat))
783 {struct utimbuf tbuff;
784 tbuff.actime = tbuff.modtime = endTime;
785 if (utime(xfrP->PFN, &tbuff))
786 Say.Emsg("Throw", errno, "set utime for", xfrP->PFN);
787 }
788 xfrP->PFN[xfrP->pfnEnd] = '\0';
789 }
790}
791
792/******************************************************************************/
793/* Private: T h r o w O K */
794/******************************************************************************/
795
796const char *XrdFrmTransfer::ThrowOK(XrdFrmTranChk *cP)
797{
798 class fdClose
799 {public:
800 int Num;
801 fdClose() : Num(-1) {}
802 ~fdClose() {if (Num >= 0) close(Num);}
803 } fnFD;
804
806 struct stat lokStat;
807 int statRC;
808
809// Check if the file is in use by checking if we got an exclusive lock
810//
811 if ((fnFD.Num = XrdSysFD_Open(xfrP->PFN, O_RDWR)) < 0)
812 return "unable to open file";
813 if (XrdOucSxeq::Serialize(fnFD.Num,XrdOucSxeq::noWait)) return "file in use";
814
815// Get the info on the lock file (enabled if old mode is in effect
816//
817 if (Config.runOld)
818 {strcpy(&xfrP->PFN[xfrP->pfnEnd], ".lock");
819 statRC = stat(xfrP->PFN, &lokStat);
820 xfrP->PFN[xfrP->pfnEnd] = '\0';
821 } else statRC = 1;
822 if (statRC && !Config.runNew) return "missing lock file";
823
824// If running in new mode then we must get the extended attribute for this file
825// unless we got the lock file time which takes precendence.
826//
827 if (Config.runNew)
828 {if (!statRC)
829 cpyInfo.Attr.cpyTime = static_cast<long long>(lokStat.st_mtime);
830 else if (cpyInfo.Get(xfrP->PFN, fnFD.Num) <= 0)
831 return "unable to get copy time xattr";
832 }
833
834// Verify the information
835//
836 if (cpyInfo.Attr.cpyTime >= static_cast<long long>(cP->Stat->st_mtime))
837 {if (xfrP->reqData.Options & XrdFrcRequest::Purge) return "";
838 return "already migrated";
839 }
840
841// Keep the lock on the base file until we are through. No one is allowed to
842// modify this file until we have migrate it.
843//
844 cP->lkfd = fnFD.Num;
845 cP->lkfx = statRC == 0;
846 fnFD.Num = -1;
847 return 0;
848}
#define DEBUG(x)
#define EPNAME(x)
struct stat Stat
Definition XrdCks.cc:49
XrdOucPup XrdCmsParser::Pup & Say
#define TRACE_Debug
void * InitXfer(void *parg)
char PFN[MAXPATHLEN+16]
const char * Type
XrdFrcRequest reqData
XrdFrcReqFile * reqFQ
#define XRDOSS_isPFN
Definition XrdOss.hh:469
#define XRDOSS_isMIG
Definition XrdOss.hh:470
#define XRDOSS_mkpath
Definition XrdOss.hh:466
@ Hash_data_is_key
Definition XrdOucHash.hh:52
#define SEC_HOST
#define CMS_CID
#define XRD_INS
#define close(a)
Definition XrdPosix.hh:48
#define open
Definition XrdPosix.hh:76
#define unlink(a)
Definition XrdPosix.hh:113
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString Path
size_t strlcpy(char *dst, const char *src, size_t sz)
#define XRDSYSTHREAD_BIND
const kXR_char XROOTD_MON_MAPMIGR
const kXR_char XROOTD_MON_MAPSTAG
int Get(const char *iName, char *buff, int blen)
Definition XrdFrcCID.cc:124
int Init(const char *qPath)
Definition XrdFrcCID.cc:159
char LFN[3072]
static const int Purge
static const int makeRW
static const int outQ
static const int Migrate
signed char Prty
long long addTOD
static void Rm(const char *Path, int islfn=0)
Definition XrdFrmCns.hh:53
static void Add(const char *tID, const char *Path, long long Size, mode_t Mode)
Definition XrdFrmCns.cc:67
struct XrdFrmConfig::Cmd xfrCmd[4]
int NeedsCTA(const char *Lfn)
XrdNetCmsNotify * cmsPath
static const int cmdStats
static const int cmdAlloc
static const int cmdRME
int RemotePath(const char *oldp, char *newp, int newpsz)
XrdOss * ossFS
XrdOucMsubs * theVec
static const int cmdXPD
int Stat(const char *xLfn, const char *xPfn, struct stat *buff)
static const int cmdMDP
XrdOucName2Name * the_N2N
const char * myName
static kXR_unt32 Map(char code, const char *uname, const char *path)
static char monMIGR
static char monSTAGE
static int Init()
static const char * checkFF(const char *Path)
void Start(int ioqType)
static const int useInpQ
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static XrdFrmXfrJob * Get(int ioQType)
static int Init()
static const int useOutQ
static const int useAnyQ
int Have(const char *Path, int isPfn=1)
int Gone(const char *Path, int isPfn=1)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
T * Find(const char *KeyVal, time_t *KeyTime=0)
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
static const int maxElem
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
int Serialize(int Opts=0)
static const int noWait
Definition XrdOucSxeq.hh:37
static int makePath(char *path, mode_t mode, bool reset=false)
int Get(const char *Path, int fd=-1)
int Set(const char *Path, int fd=-1)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdOucTrace Trace
XrdFrcCID CID
Definition XrdFrcCID.cc:56
XrdFrmConfig Config
XrdOucEnv * theEnv
XrdFrmTranArg(XrdOucEnv *Env)
XrdOucMsubs * theVec
XrdOucProg * theCmd
struct stat * Stat
XrdFrmTranChk(struct stat *sP)