-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpcp
executable file
·1599 lines (1409 loc) · 55.3 KB
/
pcp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Parallel cp program
# Copyright (c) Genome Research Ltd 2012
# Author Guy Coates <[email protected]>
# This program is released under GNU Public License V2 (GPLv2)
""" This program copies a directory tree in parallel.
Algorithm:
pcp runs in two phases. Phase I is a parallel walk of the file tree, involving all
MPI ranks in a peer-to-peer algorithm. The walk constructs the list of files to be
copied and creates the destination directory higherarcy.
In phase II, the actual files are copied. Phase II uses a master-slave algorithm.
R0 is the master and dispatches file copy instructions to the slaves (R1...Rn).
Although slightly less efficient than the peer-to-peer algorithm in phase I,
using master-slave simplifies the checkpoint/restore implementation.
As stat is a relatively slow operation on lustre, the code jumps through some
hoops to avoid doing stats during Phase I.
"""
#import rpdb2
#rpdb2.start_embedded_debugger("XXXX", fAllowRemote=True,timeout=10)
import argparse
import hashlib
import fnmatch
import os
import stat
import sys
import traceback
import time
import ctypes
import sqlite3
import pickle
import math
import random
import signal
import gzip
try:
from pcplib import lustreapi
WITHLUSTRE = True
except:
WITHLUSTRE = False
from pcplib import parallelwalk
from pcplib import statfs
from pcplib import safestat
from collections import deque
from mpi4py import MPI
import pkg_resources
import errno
try:
__version__ = pkg_resources.require("pcp")[0].version
except pkg_resources.DistributionNotFound:
__version__ = "UNRELEASED"
clib = ctypes.CDLL("libc.so.6", use_errno=True)
class Timer:
"""Simple timer / stopwatch class."""
def __init__(self):
self.running = False
self.elapsedtime = 0
self.stoptime = 0
self.starttime = 0
def reset(self):
"""Reset the timer to 0."""
self.__init__()
def start(self):
"""Start the timer."""
self.running = True
self.starttime = time.time()
def stop(self):
"""Stop the timer."""
self.running = False
self.stoptime = time.time()
self.elapsedtime += self.stoptime - self.starttime
def read(self):
"""Get the elapsed time."""
if self.running:
return (time.time() - self.starttime) + self.elapsedtime
else:
return self.elapsedtime
def timestamp():
return time.strftime("%b %d %H:%M:%S")
def createDB():
# This database holds all the information about files to be copied,
# their checksums as well as the state of the copy.
# State
# 0 Not copied.
# 1 Dispatched for copy.
# 2 Copy complete.
# 3 Dispatched for md5
# 4 md5 complete
filedb = sqlite3.connect(":memory:")
filedb.text_factory = str
filedb.execute("""CREATE TABLE FILECPY(
ID INTEGER PRIMARY KEY AUTOINCREMENT,
SORTORDER INTEGER DEFAULT -1,
FILENAME TEXT,
STATE INTEGER DEFAULT 0,
SRCMD5 TEXT,
SIZE INTEGER,
CHUNKS INTEGER DEFAULT -1,
ATTEMPTS INTEGER DEFAULT 0,
LASTRANK INTEGER DEFAULT 0)""")
filedb.execute("""CREATE INDEX COPY_IDX ON FILECPY(STATE, SORTORDER, LASTRANK)""")
# Table to hold program arguments
filedb.execute("""CREATE TABLE ARGUMENTS(
ID INTEGER PRIMARY KEY AUTOINCREMENT,
ARGS BLOB)""")
return(filedb)
# Dump the database out to disk
def dumpDB(statedb, filename):
tmpfile = filename+"__PARTIAL__"
dbfile = gzip.open(tmpfile, "wb")
for l in statedb.iterdump():
dbfile.write(l + "\n")
dbfile.close()
os.rename(tmpfile, filename)
# Restore the database state from a previous run so we
# can resume a copy.
def restoreDB(filename):
filedb = sqlite3.connect(":memory:")
filedb.text_factory = str
dumpfile = gzip.open(filename, "rb")
filedb.executescript(dumpfile.read())
filedb.commit()
dumpfile.close()
argp = filedb.execute("SELECT ARGS FROM ARGUMENTS WHERE ID == 1").fetchone()
args = pickle.loads(argp[0])
filedb.execute("UPDATE FILECPY SET STATE = 0 WHERE STATE = 1;")
filedb.execute("UPDATE FILECPY SET STATE = 2 WHERE STATE = 3;")
filedb.execute("UPDATE FILECPY SET ATTEMPTS = 0;")
filedb.execute("UPDATE FILECPY SET LASTRANK = 0;")
return(filedb, args)
def parseargs():
parser = MPIargparse(description=
"Copy a directory tree in parallel",
formatter_class =
argparse.RawDescriptionHelpFormatter,
epilog="""
This program traverses a directory tree and copies files in the tree in
parallel. It does not copy individual files in parallel. It should be invoked
via mpirun.
If run with the -l flag or -lf flags pcp will be stripe aware. -l will cause
stripe information to be copied from the source files and directories. -lf will
cause all files and directories on the destination to be striped, regardless of
the striping on the source.
Striping behaviour can be further modified with -ls and -ld. A minimum file size
can be set with -ls. Files below this size will not be striped, regardless of
the souce striping. -ld will cause all directories to be unstriped.
-l requires that the source and destination filesystems must be lustre.
-lf can be used when only the destination filesystem is lustre.
For maximum efficiency, ensure tasks are spread across as many different
machines as possible to prevent local network bottlenecks.
""")
parser.add_argument("SOURCE", help="source directory", nargs="?")
parser.add_argument("DEST", help="destination directory", nargs="?")
parser.add_argument("-b",
help="Copy files larger than C Mbytes in C Mbyte chunks",
default=500, type=int, metavar="C")
parser.add_argument("-c", help="verify copy with checksum", default=False,
action="store_true")
parser.add_argument("-d", help="dead worker timeout (seconds)", default=10,
type=int)
parser.add_argument("-g", help="only copy files matching glob",
default=None)
parser.add_argument("-i",
help=("Create incremental backup with hard links to PREVBKUP."
" Files are compared by mode, owner, mtime and size."
" If a source file matches the corresponding file in directory PREVBKUP,"
" the file in PREVBKUP is hard linked to the destination."
" Otherwise a source file with no matching destination file"
" is copied from source to destination."),
type=str, metavar="PREVBKUP", default=None)
parser.add_argument("-n", "--dry-run",
help="perform a trial run with no copies made",
action="store_true", default=False)
parser.add_argument("-t",
help="retry file copies N times in case of IO errors",
type=int, metavar="N", default=3)
parser.add_argument("-p",
help=("preserve permissions and timestamps,"
" and ownership if running as root"),
default=False, action="store_true")
parser.add_argument("-v", help="verbose", default=False,
action="store_true")
parser.add_argument("-V", "--version", help="print version number",
action='version',
version=os.path.basename(sys.argv[0]) + \
" version " + __version__)
group = parser.add_mutually_exclusive_group()
group.add_argument("-l", help="copy lustre stripe information",
default=False, action="store_true")
group.add_argument("-lf",
help=("Force striping of all files and directories. Can be combined"
" with -ls and -ld."), default=False, action="store_true")
parser.add_argument("-ls",
help=("do not stripe files smaller than B "
"bytes. Implies -l. Size can be suffixed"
"with k,M,G,T,P"), metavar="B", default=0)
parser.add_argument("-ld",
help="Do not stripe diretories.", default=False,
action="store_true")
parser.add_argument("-u",
help="Copy only when the source file is newer than the destination file,"
" or the destination file is missing.", default=False, action="store_true")
parser.add_argument("-R",
help=("Restart a copy from a checkpoint file DUMPFILE."),
type=str, metavar="DUMPFILE", default=None)
parser.add_argument("-Rv",
help=("Verify previous copy from a checkpoint file DUMPFILE."),
type=str, metavar="DUMPFILE", default=None)
parser.add_argument("-K",
help=("Enable and write checkpoints to file DUMPFILE."),
type=str, metavar="DUMPFILE", default=None)
parser.add_argument("-Km",
help=("Checkpoint every N minutes."),
type=int, metavar="N", default=60)
parser.add_argument("-Kx",
help=("Checkpoint before exit to retain history of transfer"),
default=False, action="store_true")
if len(sys.argv) == 1:
parser.print_help()
Abort()
args = parser.parse_args()
# setting blocksize = 0 will disable chunk copying.
if args.b == 0:
args.b = INFINITY
if not args.SOURCE and not (args.R or args.Rv):
print "You must specify a source directory!"
parser.print_help()
Abort()
if not args.DEST and not (args.R or args.Rv):
print "You must specify a destination directory!"
parser.print_help()
Abort()
if args.ls != 0:
args.ls = SIConvert(args.ls)
if args.ls == -1:
print "Error: incorrect size specification."
Abort()
return(args)
def Abort():
if rank == 0 and STARTEDCOPY and DUMPDB:
try:
print "Attempting write state database to %s..." %(DUMPDB),
dumpDB(statedb, DUMPDB)
print "Done."
except IOError as dberr:
print "FAILED!"
print dberr
"""Clean down all the MPI Processes."""
MPI.COMM_WORLD.Abort(1)
exit (1)
def sanitycheck(sourcedir, destdir):
"""Perform some sanity checks, including creating the destination
directory if it does not exist and ensuring excessive parallelism is not
used."""
realsource = os.path.realpath(sourcedir)
realdest = os.path.realpath(destdir)
if realsource == realdest:
print
print ("ERROR: Source and destination directory are the same!")
print
Abort()
if not WITHLUSTRE and (LSTRIPE or FORCESTRIPE or NODIRSTRIPE or MINSTRIPESIZE):
print
print ("Error: Lustre stripe options specified but lustreapi is not available.")
print
Abort()
# If we are using lustre stripe options, check that the source and
# destination filesystems are lustre.
if LSTRIPE:
fstype = statfs.fstype(sourcedir)
if fstype != lustreapi.LUSTREMAGIC:
print ""
print ("ERROR: You have asked me to copy lustre striping attributes, but"
" %s is not a lustre directory.") % sourcedir
print "Exiting."
Abort()
# The destination might not exist yet, so walk up the path until we find the
# mountpoint.
if LSTRIPE or FORCESTRIPE:
path = realdest
while not os.path.ismount(path):
path = os.path.dirname(path)
fstype = statfs.fstype(path)
if fstype != lustreapi.LUSTREMAGIC:
print ""
print ("ERROR: You have asked me to set lustre striping attributes, but"
"%s is not a lustre filesystem") % path
print "Exiting."
Abort()
def scantree(sourcedir, destdir, statedb):
"""walk the src file tree, create the destination directories and put the
files to be copied into the database."""
totaldirs = 0
totalscanned = 0
if rank == 0:
startime = time.time()
if not os.path.isdir(sourcedir):
print "R%i: Error: %s not a directory" % (rank, sourcedir)
Abort()
# results are ([directories][files to be copied ][total files])
# FIXME: change to a proper data structure.
walker = copydirtree(comm, results=[[],[],0])
listofpaths = walker.Execute(sourcedir)
if rank == 0:
for l in listofpaths:
for f in l[1]:
statedb.execute("""INSERT INTO FILECPY (FILENAME) VALUES (?)""",
(f,))
totaldirs += len(l[0])
totalscanned += l[2]
endtime = time.time()
walltime = endtime - startime
totalfiles = statedb.execute("SELECT COUNT(*) FROM FILECPY").fetchone()[0]
rate = (totalfiles + totaldirs) / walltime
walltime = time.strftime("%H hrs %M mins %S secs",
time.gmtime(walltime))
print ("Phase I done: Scanned %i files, %i dirs in %s"
" (%.0f items/sec)."
% (totalscanned, totaldirs, walltime, rate))
print " %i files will be copied." %totalfiles
# Shuffle rows. If we don't do this, chunks of files tend to be copied at
# the same time, causing hot OSTs in the case of unstriped files.
statedb.execute("""UPDATE FILECPY SET SORTORDER = ABS(RANDOM() % ?)""",
(totalfiles,))
return()
def fadviseSeqNoCache(fileD):
"""Advise the kernel that we are only going to access file-descriptor
fileD once, sequentially."""
POSIX_FADV_SEQUENTIAL = 2
POSIX_FADV_DONTNEED = 4
offset = ctypes.c_int64(0)
length = ctypes.c_int64(0)
clib.posix_fadvise(fileD, offset, length, POSIX_FADV_SEQUENTIAL)
clib.posix_fadvise(fileD, offset, length, POSIX_FADV_DONTNEED)
def md5copy(src, dst, blksize, MD5SUM, chunk):
"""Combined copy / md5 calcuation function. Copies data from src to dst in
blksize chunks. If MD5SUM is true, it also calculates the md5sum of the
source file. Returns the md5sum of the source and the number of bytes copied."""
md5hash = hashlib.new("md5")
bytescopied = 0
infile = open(src, "rb")
if chunk < 0:
# Copy the file in one go:
outfile = open(dst, "wb")
fadviseSeqNoCache(infile.fileno())
fadviseSeqNoCache(outfile.fileno())
while True:
data = infile.read(blksize)
if not data:
break
outfile.write(data)
bytescopied += len(data)
if MD5SUM:
md5hash.update(data)
else:
# copy CHUNKSIZE bytes:
outfile = open(dst, "r+")
fadviseSeqNoCache(infile.fileno())
fadviseSeqNoCache(outfile.fileno())
infile.seek(chunk*CHUNKSIZE)
outfile.seek(chunk*CHUNKSIZE)
nreads, remainder = divmod(CHUNKSIZE, blksize)
for i in xrange(nreads):
data = infile.read(blksize)
outfile.write(data)
bytescopied += len(data)
if MD5SUM:
md5hash.update(data)
if remainder > 0:
data = infile.read(remainder)
outfile.write(data)
bytescopied += len(data)
if MD5SUM:
md5hash.update(data)
infile.close()
outfile.close()
digest = md5hash.hexdigest()
if not MD5SUM:
digest = None
return(digest, bytescopied)
def createstripefile(src, dst, size):
"""Create a file dst with the lustre stripe information copied from src, unless
filesystem is < size, in which case we set the striping to 1."""
stripestatus = 0
if LSTRIPE:
layout = lustreapi.getstripe(src)
if (LSTRIPE and layout.isstriped()) or FORCESTRIPE:
if size < MINSTRIPESIZE:
stripestatus = -1
count = 1
else:
stripestatus = 1
count = -1
else:
count = 1
if not DRYRUN:
try:
lustreapi.setstripe(dst, stripecount=count)
except IOError, error:
if error.errno == errno.EEXIST:
# file exists; blow it away and try again...
os.remove(dst)
lustreapi.setstripe(dst, stripecount=count)
else:
raise
return(stripestatus)
def calcmd5(filename, chunk):
"""calculate the md5sum of a file. Returns a tuple of (md5sum,amount of
data checksummed), or (None,0) in the case of symlinks."""
md5hash = hashlib.new("md5")
# Use the optimal blocksize for IO.
filestat = safestat.safestat(filename)
blksize = filestat.st_blksize
mode = filestat.st_mode
byteschecked = 0
if stat.S_ISLNK(mode):
return(None, byteschecked)
fh = open(filename, "rb")
fadviseSeqNoCache(fh.fileno())
# MD5 the whole file
if chunk < 0:
while True:
data = fh.read(blksize)
byteschecked += len(data)
if not data:
break
md5hash.update(data)
else:
#MD5 just our chunk
fh.seek(chunk*CHUNKSIZE)
nreads, remainder = divmod(CHUNKSIZE, blksize)
for i in xrange(nreads):
data = fh.read(blksize)
md5hash.update(data)
byteschecked += len(data)
if remainder > 0:
data = fh.read(remainder)
md5hash.update(data)
byteschecked += len(data)
fh.close()
digest = md5hash.hexdigest()
return(digest, byteschecked)
def ConsumeWork(sourcedir, destdir):
"""Listen for work from the dispatcher and copies/md5sums files as
appropriate. When send the SHUTDOWN message the worker will send
performance stats back to the master."""
filescopied = 0
md5done = 0
bytescopied = 0
byteschksummed = 0
md5timer = Timer()
copytimer = Timer()
# Poll for work.
while True:
msg = comm.recv(source=0, tag=1)
action = msg[0]
if action == "SHUTDOWN":
break
(filename, idx, chunk) = msg[1]
md5sum = None
destination = mungePath(sourcedir, destdir, filename)
if action == "COPY":
copytimer.start()
try:
size, speed, md5sum, stripestatus, status = \
copyFile(filename, destination, chunk)
except (IOError, OSError) as error:
speed = 0
size = 0
stripestatus = 0
# permission denied errors are not fatal. Skip over the file
# and carry on.
if error.errno == errno.EACCES:
status = 3
# File might have moved whilst we copied it!
elif error.errno == errno.ENOENT:
status = 5
else:
status = 1
if status == 0 or status == 4 or status == 7:
bytescopied += size
filescopied += 1
msg = ("COPYRESULT",( md5sum, idx, rank, status, speed,
size, stripestatus))
comm.send(msg, dest=0, tag=1)
copytimer.stop()
if action == "MD5":
md5timer.start()
if DRYRUN:
size = 0
status = 0
md5sum = "DEADBEAFdeadbeafDEADBEAFdeadbeaf"
else:
try:
md5sum, size = calcmd5(destination, chunk)
status = 0
except (IOError, OSError):
size = 0
status = 1
msg = ("MD5RESULT", (md5sum, idx, rank, status, None, None, None))
comm.send(msg, dest=0, tag=1)
md5done += 1
byteschksummed += size
md5timer.stop()
# Return stats
comm.gather((filescopied, md5done, bytescopied, byteschksummed,
copytimer.read(), md5timer.read()), root=0)
return(0)
def checkAlive(rank, workers, timeout):
"""Quirky farm nodes can cause the MPI runtime to lock up during the task
spawn. This routine checks whether nodes can exchange messages. If a node
has not responded after timeout seconds we bail."""
if rank > 0:
msg = ("ALIVE", (rank,))
comm.send(msg, dest=0, tag=3)
else:
expectedworkers = set(range(1, workers))
aliveworkers = set()
giveuptime = time.time() + timeout
while time.time() < giveuptime:
if comm.Iprobe(source=MPI.ANY_SOURCE, tag=3):
msg = comm.recv(source=MPI.ANY_SOURCE, tag=3)
status = msg[0]
rank = msg[1][0]
aliveworkers.add(rank)
if len(aliveworkers) == len(expectedworkers):
print "R0: All workers have reported in."
return
print ("Error: The following workers did not report in after"
" %i seconds") % timeout
awol = expectedworkers.difference(aliveworkers)
for i in awol:
print "R%i" % i
Abort()
def DispatchWork(statedb):
"""The dispatcher sends copy/md5 tasks out to idle workers. If copy/md5
tasks fail the dispatcher will re-queue them for retries."""
global WARNINGS
global CHECKPOINTNOW
global COPYREMAINS
global MD5REMAINS
global TOTALROWS
global RVERRORS
# Queue containing worker who are ready for work.
idleworkers = deque()
idleworkers.extend(range(1, workers))
# Start the checkpoint timer
if DUMPDB:
cptimer = Timer()
cptimer.start()
TOTALROWS = statedb.execute \
("""SELECT COUNT(*) FROM FILECPY""").fetchone()[0]
if VERIFY:
COPYREMAINS = 0
MD5REMAINS = statedb.execute \
("""SELECT COUNT(*) FROM FILECPY WHERE STATE == 4""").fetchone()[0]
for errfile, chunk in statedb.execute \
("SELECT FILENAME, CHUNKS FROM FILECPY WHERE STATE < 4"):
RVERRORS += 1
destfile = mungePath(sourcedir, destdir, errfile)
if chunk < 0:
print "COPYFAIL:%s" % destfile
else:
print "COPYFAIL,%d:%s" % (chunk,destfile)
else:
COPYREMAINS = statedb.execute \
("""SELECT COUNT(*) FROM FILECPY WHERE STATE == 0""").fetchone()[0]
if MD5SUM:
MD5REMAINS = statedb.execute \
("""SELECT COUNT(*) FROM FILECPY WHERE STATE < ?""",(ENDSTATE,)).fetchone()[0]
else:
MD5REMAINS = 0
# loop until we have no more work to send.
while COPYREMAINS > 0 or MD5REMAINS > 0:
# See if we need to checkpoint
if DUMPDB and not VERIFY:
if cptimer.read() > DUMPINTERVAL:
print "RO: Writing checkpoint to %s..." %DUMPDB,
dumpDB(statedb, DUMPDB)
print "Done"
cptimer.reset()
cptimer.start()
if CHECKPOINTNOW and not VERIFY:
if not DUMPDB:
dumpfile = "pcp_checkpoint.db"
else:
dumpfile = DUMPDB
print "R0: SIGUSR1: Writing checkpoint to %s..." %dumpfile,
dumpDB(statedb, dumpfile)
print "Done"
CHECKPOINTNOW = False
# Listen for workers reporting in and deal with the results
if comm.Iprobe(source=MPI.ANY_SOURCE, tag=1):
msg = comm.recv(source=MPI.ANY_SOURCE, tag=1)
action = msg[0]
payload = msg[1]
workerrank = msg[1][2]
idleworkers.appendleft(workerrank)
if action == "COPYRESULT":
processCopy(statedb, payload)
if action == "MD5RESULT":
processMD5(statedb, payload)
# try for dispatch
if len(idleworkers) > 0:
worker = idleworkers.pop()
if VERIFY:
task = statedb.execute("SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 4 ORDER BY SORTORDER LIMIT 1").fetchone()
if task:
statedb.execute("""UPDATE FILECPY SET STATE = 5 WHERE ID = ?""",(task[1],))
msg = ("MD5", (task[0], task[1], task[2]))
comm.send(msg, dest=worker, tag=1)
continue
else:
# 2 workers is a special case; we can't do MD5sum or retries on
# a different nodes, as we only have 1 worker node.
if workers == 2:
lastrank = -1
else:
lastrank = worker
task = statedb.execute("""SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 0 AND
LASTRANK <> ? ORDER BY SORTORDER LIMIT 1""",(lastrank, )).fetchone()
if task:
statedb.execute("""UPDATE FILECPY SET STATE = 1 WHERE ID = ?""",(task[1],))
msg = ("COPY", (task[0], task[1], task[2]))
comm.send(msg, dest=worker, tag=1)
continue
if MD5SUM:
task = statedb.execute("""SELECT FILENAME, ID, CHUNKS FROM FILECPY WHERE STATE == 2 AND
LASTRANK <> ? ORDER BY SORTORDER LIMIT 1""",(lastrank, )).fetchone()
if task:
statedb.execute("""UPDATE FILECPY SET STATE = 3 WHERE ID = ?""",(task[1],))
msg = ("MD5", (task[0], task[1], task[2]))
comm.send(msg, dest=worker, tag=1)
continue
# There is work, but not for this worker. Send to the back of the queue
idleworkers.appendleft(worker)
if VERBOSE:
print "R0: No more work to do."
def processMD5(statedb, payload):
global WARNINGS
global COPYREMAINS
global MD5REMAINS
global RVERRORS
md5sum = payload[0]
idx = payload[1]
workerrank = payload[2]
status = payload[3]
speed = payload[4]
size = payload[5]
stripestatus = payload[6]
filename, attempt, srcmd5, chunk = statedb.execute("""SELECT FILENAME, ATTEMPTS, SRCMD5,
CHUNKS FROM FILECPY WHERE ID = ?""", (idx,)).fetchone()
if status == 0:
if VERIFY:
MD5REMAINS -= 1
statedb.execute("UPDATE FILECPY SET STATE = 6 WHERE ID = ?", (idx,))
if srcmd5 != md5sum:
RVERRORS += 1
destfile = mungePath(sourcedir, destdir, filename)
if chunk < 0:
print "MD5FAIL:%s" % destfile
else:
print "MD5FAIL,%d:%s" % (chunk,destfile)
elif srcmd5 == md5sum:
statedb.execute("""UPDATE FILECPY SET STATE = 4
WHERE ID = ?""", (idx,))
MD5REMAINS -= 1
if VERBOSE:
if chunk < 0:
print "R%i: %s %s md5sum verified (%s)" \
% (workerrank, timestamp(), filename, md5sum)
else:
print "R%i: %s %s chunk %i md5sum verified (%s)" \
% (workerrank, timestamp(), filename, chunk, md5sum)
else:
# This is bad; we got a md5 mismatch, but no IO
# exceptions were thrown.
attempt += 1
statedb.execute("""UPDATE FILECPY SET STATE = 0, SRCMD5 = NULL, ATTEMPTS = ?,
LASTRANK = ? WHERE ID =?""",(attempt, workerrank, idx))
COPYREMAINS += 1
if attempt < MAXTRIES:
WARNINGS +=1
print ("R%i: %s WARNING: SILENT DATA CORRUPTION %s"
" md5sum mismatch (%s:%s). Re-queuing copy %i."
% (workerrank, timestamp(), filename, srcmd5, md5sum, attempt))
# TODO: Save corrupt segments of files.
if chunk < 0:
corruptfile = destfile+"_CORRUPTED_%i" %attempt
destfile = mungePath(sourcedir, destdir, filename)
print ("R%i: %s Renaming corrupt file as %s for later analysis."
% (workerrank, timestamp(), corruptfile))
os.rename (destfile, corruptfile)
else:
print "%s ERROR: Max number of copies reached on %s."\
%(timestamp(), filename)
Abort()
else:
# md5 calc failed due to a detected error.
if VERIFY:
MD5REMAINS -= 1
statedb.execute("UPDATE FILECPY SET STATE = 6 WHERE ID = ?", (idx,))
RVERRORS += 1
destfile = mungePath(sourcedir, destdir, filename)
if chunk < 0:
print "READFAIL:%s" % destfile
else:
print "READFAIL,%d:%s" % (chunk,destfile)
else:
attempt += 1
statedb.execute("""UPDATE FILECPY SET ATTEMPTS = ?, LASTRANK = ?, STATE = 2
WHERE ID =?""",(attempt, workerrank, idx))
if attempt < MAXTRIES:
WARNINGS += 1
print ("R%i: %s WARNING: Error calculating destination"
" md5sum of %s on attempt %i. Re-trying..." \
%(workerrank, timestamp(), filename, attempt))
else:
# Retries exceeded.
print ("R%i %s ERROR: Max number of md5 attempts reached on %s."
%(timestamp(), filename))
Abort()
return()
def processCopy(statedb, payload):
global WARNINGS
global COPYREMAINS
global MD5REMAINS
global TOTALROWS
md5sum = payload[0]
idx = payload[1]
workerrank = payload[2]
status = payload[3]
speed = payload[4]
size = payload[5]
stripestatus = payload[6]
filename, attempt, chunk = statedb.execute("""SELECT FILENAME, ATTEMPTS, CHUNKS FROM FILECPY
WHERE ID = ?""",(idx, )).fetchone()
# Copy is complete.
if status == 0 or status == 4 or status == 7:
statedb.execute("""UPDATE FILECPY SET STATE = 2, SRCMD5 = ?, LASTRANK = ?,
SIZE = ? WHERE ID = ? """,(md5sum, workerrank, size, idx))
COPYREMAINS -= 1
if VERBOSE:
stripetxt = ""
if LSTRIPE or FORCESTRIPE:
if stripestatus == 1:
stripetxt = "(striped)"
elif stripestatus == 0:
stripetxt = "(unstriped)"
elif stripestatus == -1:
stripetxt = "(small file: ignored striping)"
if chunk < 0:
print "R%i: %s copied %s %s %s (%s/s)" \
% (workerrank, timestamp(), filename, stripetxt,
prettyPrint(size), prettyPrint(speed))
else:
print "R%i: %s copied %s chunk %i %s (%s/s)" \
% (workerrank, timestamp(), filename, chunk,
prettyPrint(size), prettyPrint(speed))
if status == 4:
# unable to preserve permissions
WARNINGS += 1
print ("R%i: %s WARNING: unable to preserve"
" permissions on %s") \
% (workerrank, timestamp(), filename)
elif status == 7:
# unable to preserve ownership
WARNINGS += 1
print ("R%i: %s WARNING: unable to preserve"
" ownership of %s") \
% (workerrank, timestamp(), filename)
# copy failed.
elif status ==1:
if attempt < MAXTRIES:
attempt += 1
statedb.execute("""UPDATE FILECPY SET ATTEMPTS = ?,
LASTRANK = ?, STATE = 0 WHERE ID = ? """,(attempt,
workerrank, idx))
WARNINGS += 1
print ("R%i: %s WARNING: Error copying %s on attempt %i"
" Retrying..."
% (workerrank, timestamp(), filename,
attempt))
else:
print "%s ERROR: Max number of copies reached on %s" \
%(timestamp(), filename)
Abort()
# Copy failed permenantly but non-fatally. Mark as done without bothering to retry.
elif status == 2:
# nonstandard filetype
statedb.execute("""UPDATE FILECPY SET STATE = ?
WHERE ID = ? """,(ENDSTATE, idx))
COPYREMAINS -= 1
MD5REMAINS -= 1
WARNINGS +=1
print "R%i: %s WARNING: unable to copy %s (%s). Skipping..." \
% (workerrank, timestamp(), filename, md5sum)
elif status == 3:
# permission denied
statedb.execute("""UPDATE FILECPY SET STATE = ?
WHERE ID = ? """,(ENDSTATE, idx))
COPYREMAINS -= 1
MD5REMAINS -= 1
WARNINGS += 1
print "R%i: %s WARNING: permission denied on %s. Skipping..." \
% (workerrank, timestamp(), filename)
elif status == 5:
# File does not exist. We do retry here, as we might be on
# a node that does not have the FS mounted.
if attempt < MAXTRIES:
attempt += 1
statedb.execute("""UPDATE FILECPY SET STATE = 0, ATTEMPTS = ?,
LASTRANK = ? WHERE ID = ?""",
(attempt, workerrank, idx))
WARNINGS += 1
print ("R%i: %s WARNING: %s No such file or directory"
" attempt %i. Retrying..."
% (workerrank, timestamp(), filename, attempt))
else:
# Treat non-existance as a non-fatal error.
# The user might simply have moved the file during the copy
statedb.execute("""UPDATE FILECPY SET STATE = ?, SRCMD5 = ?
WHERE ID = ? """,(ENDSTATE, md5sum, idx))
COPYREMAINS -= 1
MD5REMAINS -= 1
WARNINGS += 1
print ("R%i: %s WARNING %s No such file or directory on "
"attempt %i. Maybe someone moved the file?"
" Skipping...") %(workerrank, timestamp(),
filename, attempt)
elif status == 6:
chunks = int(math.ceil(size / float(CHUNKSIZE)))
with statedb:
for i in range(chunks):
sortid = random.randint(0, TOTALROWS + chunks)
statedb.execute("INSERT INTO FILECPY (FILENAME, SORTORDER, CHUNKS) VALUES (?,?,?)",
(filename, sortid, i))
statedb.execute("DELETE FROM FILECPY WHERE ID = ?", (idx,))
COPYREMAINS += chunks-1
TOTALROWS += chunks
if MD5SUM:
MD5REMAINS += chunks-1
if VERBOSE:
stripetxt = ""
if LSTRIPE or FORCESTRIPE:
if stripestatus == 1:
stripetxt = "(striped)"
elif stripestatus == 0:
stripetxt = "(unstriped)"
elif stripestatus == -1:
stripetxt = "(small file: ignored striping)"
print ("R%i: %s Large file %s: copying in %i chunks."
%(workerrank, filename, stripetxt, chunks))
return()
def ShutdownWorkers(starttime):
"""Tell workers we have no more work for them and collate the stats"""
totalfiles = 0
totalbytes = 0
if VERBOSE:
print "R0: Sending SHUTDOWN to workers"
for r in range(1, workers):
msg = ("SHUTDOWN",())
comm.send(msg, dest=r, tag=1)
if VERBOSE:
print "rank %i shutdown" % r
if VERBOSE:
print "R0: Gathering results"
data = comm.gather(0, root=0)
# Gather the runtime statistics
endtime = time.time()
totalelapsedtime = endtime - starttime
print ""
print "Copy Statisics:"
for r in range(1, workers):
filescopied, md5done, bytescopied, byteschksummed, copytime, \
md5time = data[r]
totalfiles += filescopied
totalbytes += bytescopied