-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy path_os.py
1547 lines (1309 loc) · 61.9 KB
/
_os.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright: (c) 2019, Jordan Borean (@jborean93) <[email protected]>
# MIT License (see LICENSE or https://opensource.org/licenses/MIT)
from __future__ import annotations
import collections
import datetime
import errno
import io
import ntpath
import operator
import os
import stat as py_stat
import time
import typing as t
from smbclient._io import (
SMBDirectoryIO,
SMBFileIO,
SMBFileTransaction,
SMBPipeIO,
SMBRawIO,
ioctl_request,
query_info,
set_info,
)
from smbprotocol import MAX_PAYLOAD_SIZE
from smbprotocol._text import to_bytes, to_text
from smbprotocol.exceptions import NoSuchFile, SMBOSError, SMBResponseException
from smbprotocol.file_info import (
FileAttributeTagInformation,
FileBasicInformation,
FileDispositionInformation,
FileFsFullSizeInformation,
FileFsVolumeInformation,
FileFullEaInformation,
FileInformationClass,
FileInternalInformation,
FileLinkInformation,
FileRenameInformation,
FileStandardInformation,
FileStreamInformation,
)
from smbprotocol.header import NtStatus
from smbprotocol.ioctl import (
CtlCode,
IOCTLFlags,
SMB2SrvCopyChunk,
SMB2SrvCopyChunkCopy,
SMB2SrvCopyChunkResponse,
SMB2SrvRequestResumeKey,
)
from smbprotocol.open import (
CreateOptions,
FileAttributes,
FilePipePrinterAccessMask,
QueryInfoFlags,
)
from smbprotocol.reparse_point import (
ReparseDataBuffer,
ReparseTags,
SymbolicLinkFlags,
SymbolicLinkReparseDataBuffer,
)
from smbprotocol.structure import DateTimeField
XATTR_CREATE = getattr(os, "XATTR_CREATE", 1)
XATTR_REPLACE = getattr(os, "XATTR_REPLACE", 2)
MAX_COPY_CHUNK_SIZE = 1 * 1024 * 1024 # maximum chunksize 1M from 3.3.3 in MS-SMB documentation
MAX_COPY_CHUNK_COUNT = 16 # maximum total chunksize 16M from 3.3.3 in MS-SMB documentation
SMBStatResult = collections.namedtuple(
"SMBStatResult",
[
"st_mode",
"st_ino",
"st_dev",
"st_nlink",
"st_uid",
"st_gid",
"st_size",
"st_atime",
"st_mtime",
"st_ctime",
# Extra attributes not part of the base stat_result
"st_chgtime", # ChangeTime, change of file metadata and not just data (mtime)
"st_atime_ns",
"st_mtime_ns",
"st_ctime_ns",
"st_chgtime_ns",
"st_file_attributes",
"st_reparse_tag",
],
)
SMBStatVolumeResult = collections.namedtuple(
"SMBStatVolumeResult",
[
"total_size",
"caller_available_size",
"actual_available_size",
],
)
class SMBFileStreamInformation(t.NamedTuple):
name: str
size: int
allocation_size: int
def is_remote_path(path: str) -> bool:
"""
Returns True iff the given path is a remote SMB path (rather than a local path).
:param path: The filepath.
:return: True iff the given path is a remote SMB path.
"""
return path.startswith("\\\\") or path.startswith("//")
def copyfile(src, dst, **kwargs):
"""
Copy a file to a different location on the same server. This will fail if the src and dst paths are to a different
server. This will replace the file at dst if it already exists.
This is not normally part of the builtin os package but because it relies on some SMB IOCTL commands it is useful
to expose here.
:param src: The full UNC path of the source file.
:param dst: The full UNC path of the target file.
:param kwargs: Common SMB Session arguments for smbclient.
"""
norm_src = ntpath.normpath(src)
norm_dst = ntpath.normpath(dst)
if not is_remote_path(norm_src):
raise ValueError("src must be an absolute path to where the file should be copied from.")
if not is_remote_path(norm_dst):
raise ValueError("dst must be an absolute path to where the file should be copied to.")
src_host = ntpath.splitdrive(norm_src)[0].split("\\")[2]
dst_host = ntpath.splitdrive(norm_dst)[0].split("\\")[2]
if src_host.lower() != dst_host.lower():
raise ValueError("Cannot copy a file to a different host than the src.")
with open_file(norm_src, mode="rb", share_access="r", buffering=0, **kwargs) as src_fd:
with SMBFileTransaction(src_fd) as transaction_src:
ioctl_request(
transaction_src,
CtlCode.FSCTL_SRV_REQUEST_RESUME_KEY,
flags=IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL,
output_size=32,
)
resume_response = SMB2SrvRequestResumeKey()
resume_response.unpack(transaction_src.results[0])
resume_key = resume_response["resume_key"].get_value()
chunks = []
offset = 0
while offset < src_fd.fd.end_of_file:
copychunk_struct = SMB2SrvCopyChunk()
copychunk_struct["source_offset"] = offset
copychunk_struct["target_offset"] = offset
copychunk_struct["length"] = min(MAX_COPY_CHUNK_SIZE, src_fd.fd.end_of_file - offset)
chunks.append(copychunk_struct)
offset += MAX_COPY_CHUNK_SIZE
with open_file(norm_dst, mode="wb", share_access="r", buffering=0, **kwargs) as dst_fd:
for i in range(0, len(chunks), MAX_COPY_CHUNK_COUNT):
batch = chunks[i : i + MAX_COPY_CHUNK_COUNT]
with SMBFileTransaction(dst_fd) as transaction_dst:
copychunkcopy_struct = SMB2SrvCopyChunkCopy()
copychunkcopy_struct["source_key"] = resume_key
copychunkcopy_struct["chunks"] = batch
ioctl_request(
transaction_dst,
CtlCode.FSCTL_SRV_COPYCHUNK_WRITE,
flags=IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL,
output_size=12,
input_buffer=copychunkcopy_struct,
)
for result in transaction_dst.results:
copychunk_response = SMB2SrvCopyChunkResponse()
copychunk_response.unpack(result)
if copychunk_response["chunks_written"].get_value() != len(batch):
raise OSError(
f"Failed to copy all the chunks in a server side copyfile: '{norm_src}' -> '{norm_dst}'"
)
def link(src, dst, follow_symlinks=True, **kwargs):
"""
Create a hard link pointing to src named dst. The dst argument must be an absolute path in the same share as
src.
:param src: The full UNC path to used as the source of the hard link.
:param dst: The full UNC path to create the hard link at.
:param follow_symlinks: Whether to link to the src target (True) or src itself (False) if src is a symlink.
:param kwargs: Common arguments used to build the SMB Session.
"""
norm_src = ntpath.normpath(src)
norm_dst = ntpath.normpath(dst)
if not is_remote_path(norm_src):
raise ValueError("src must be the absolute path to where the file is hard linked to.")
src_root = ntpath.splitdrive(norm_src)[0]
dst_root, dst_name = ntpath.splitdrive(norm_dst)
if src_root.lower() != dst_root.lower():
raise ValueError("Cannot hardlink a file to a different root than the src.")
raw = SMBFileIO(
norm_src,
mode="r",
share_access="rwd",
desired_access=FilePipePrinterAccessMask.FILE_WRITE_ATTRIBUTES,
create_options=0 if follow_symlinks else CreateOptions.FILE_OPEN_REPARSE_POINT,
**kwargs,
)
with SMBFileTransaction(raw) as transaction:
link_info = FileLinkInformation()
link_info["replace_if_exists"] = False
link_info["file_name"] = to_text(dst_name[1:])
set_info(transaction, link_info)
def listdir(path, search_pattern="*", **kwargs):
"""
Return a list containing the names of the entries in the directory given by path. The list is in arbitrary order,
and does not include the special entries '.' and '..' even if they are present in the directory.
:param path: The path to the directory to list.
:param search_pattern: THe search string to match against the names of directories or files. This pattern can use
'*' as a wildcard for multiple chars and '?' as a wildcard for a single char. Does not support regex patterns.
:param kwargs: Common SMB Session arguments for smbclient.
:return: A list containing the names of the entries in the directory.
"""
with SMBDirectoryIO(path, mode="r", share_access="r", **kwargs) as dir_fd:
try:
raw_filenames = dir_fd.query_directory(search_pattern, FileInformationClass.FILE_NAMES_INFORMATION)
return list(
e["file_name"].get_value().decode("utf-16-le")
for e in raw_filenames
if e["file_name"].get_value().decode("utf-16-le") not in [".", ".."]
)
except NoSuchFile:
return []
def lstat(path, **kwargs):
"""
Perform the equivalent of an lstat() system call on the given path. Similar to stat(), but does not follow
symbolic links.
:param path: The path to the file or directory to stat.
:param kwargs: Common SMB Session arguments for smbclient.
:return: See stat() for the return values.
"""
return stat(path, follow_symlinks=False, **kwargs)
def mkdir(path, **kwargs):
"""
Create a directory named path. If the directory already exists, OSError(errno.EEXIST) is raised.
:param path: The path to the directory to create.
:param kwargs: Common SMB Session arguments for smbclient.
"""
raw = SMBDirectoryIO(path, mode="x", **kwargs)
with SMBFileTransaction(raw):
pass
def makedirs(path, exist_ok=False, **kwargs):
"""
Recursive directory creation function. Like mkdir(), but makes all intermediate-level directories needed to contain
the leaf directory.
If exist_ok is False (the default), an OSError is raised if the target directory already exists.
:param path: The path to the directory to create.
:param exist_ok: Set to True to not fail if the target directory already exists.
:param kwargs: Common SMB Session arguments for smbclient.
"""
create_queue = [ntpath.normpath(path)]
present_parent = None
while create_queue:
mkdir_path = create_queue[-1]
try:
mkdir(mkdir_path, **kwargs)
except OSError as err:
if err.errno == errno.EEXIST:
present_parent = mkdir_path
create_queue.pop(-1)
if not create_queue and not exist_ok:
raise
elif err.errno == errno.ENOENT:
# Check if the parent path has already been created to avoid getting in an endless loop.
parent_path = ntpath.dirname(mkdir_path)
if present_parent == parent_path:
raise
else:
create_queue.append(parent_path)
else:
raise
else:
create_queue.pop(-1)
# Taken from stdlib typeshed but removed the unused 'U' flag
OpenTextModeUpdating: t.TypeAlias = t.Literal[
"r+",
"+r",
"rt+",
"r+t",
"+rt",
"tr+",
"t+r",
"+tr",
"w+",
"+w",
"wt+",
"w+t",
"+wt",
"tw+",
"t+w",
"+tw",
"a+",
"+a",
"at+",
"a+t",
"+at",
"ta+",
"t+a",
"+ta",
"x+",
"+x",
"xt+",
"x+t",
"+xt",
"tx+",
"t+x",
"+tx",
]
OpenTextModeWriting: t.TypeAlias = t.Literal["w", "wt", "tw", "a", "at", "ta", "x", "xt", "tx"]
OpenTextModeReading: t.TypeAlias = t.Literal["r", "rt", "tr"]
OpenTextMode: t.TypeAlias = t.Literal[OpenTextModeUpdating, OpenTextModeWriting, OpenTextModeReading]
OpenBinaryModeUpdating: t.TypeAlias = t.Literal[
"rb+",
"r+b",
"+rb",
"br+",
"b+r",
"+br",
"wb+",
"w+b",
"+wb",
"bw+",
"b+w",
"+bw",
"ab+",
"a+b",
"+ab",
"ba+",
"b+a",
"+ba",
"xb+",
"x+b",
"+xb",
"bx+",
"b+x",
"+bx",
]
OpenBinaryModeWriting: t.TypeAlias = t.Literal["wb", "bw", "ab", "ba", "xb", "bx"]
OpenBinaryModeReading: t.TypeAlias = t.Literal["rb", "br"]
OpenBinaryMode: t.TypeAlias = t.Literal[OpenBinaryModeUpdating, OpenBinaryModeReading, OpenBinaryModeWriting]
FileType: t.TypeAlias = t.Literal["file", "dir", "pipe"]
# Text mode: always returns a TextIOWrapper
@t.overload
def open_file(
path,
mode: OpenTextMode = "r",
buffering=-1,
file_type: FileType = "file",
encoding=None,
errors=None,
newline=None,
share_access=None,
desired_access=None,
file_attributes=None,
**kwargs,
) -> io.TextIOWrapper[io.BufferedRandom | io.BufferedReader | io.BufferedWriter]: ...
# Otherwise return BufferedRandom, BufferedReader, or BufferedWriter
# NOTE: This incorrectly returns unbuffered opens as Buffered types, due to difficulties
# in annotating that case
@t.overload
def open_file(
path,
mode: OpenBinaryModeUpdating,
buffering=-1,
encoding=None,
errors=None,
newline=None,
share_access=None,
desired_access=None,
file_attributes=None,
file_type: FileType = "file",
**kwargs,
) -> io.BufferedRandom: ...
@t.overload
def open_file(
path,
mode: OpenBinaryModeReading,
buffering=-1,
encoding=None,
errors=None,
newline=None,
share_access=None,
desired_access=None,
file_attributes=None,
file_type: FileType = "file",
**kwargs,
) -> io.BufferedReader: ...
@t.overload
def open_file(
path,
mode: OpenBinaryModeWriting,
buffering=-1,
encoding=None,
errors=None,
newline=None,
share_access=None,
desired_access=None,
file_attributes=None,
file_type: FileType = "file",
**kwargs,
) -> io.BufferedWriter: ...
def open_file(
path,
mode="r",
buffering=-1,
encoding=None,
errors=None,
newline=None,
share_access=None,
desired_access=None,
file_attributes=None,
file_type: t.Literal["file", "dir", "pipe"] = "file",
**kwargs,
):
"""
Open a file on an SMB share and return a corresponding file object. If the file cannot be opened, an OSError is
raised. This function is designed to mimic the builtin open() function but limits some functionality based on
what is available over SMB.
It is recommended to call this function with a 'with' statement to ensure the file is closed when not required:
with smbclient.open_file("\\\\server\\share\\file.txt") as fd:
fd.read()
Otherwise the .close() function will also close the handle to the file.
:param path: The absolute pathname of the file to be opened.
:param mode: Optional string that specifies the mode in which the file is opened. It defaults to 'r' which means
for reading in text mode. Other common values are 'w' for writing (truncating the file if it already exists),
'x' for exclusive creation and 'a' for appending. The available modes are:
Open Mode
'r': Open for reading (default).
'w': Open for writing, truncating the file first.
'x': Open for exclusive creation, failing if the file already exists.
'a': Open for writing, appending to the end of the file if it exists.
'+': Open for updating (reading and writing), can be used in conjunction with any of the above.
Open Type - can be specified with the OpenMode
't': Text mode (default).
'b': Binary mode.
:param buffering: An optional integer used to set the buffering policy. Pass 0 to switch buffering off (only
allowed in binary mode), 1 to select line buffering (only usable in text mode), and an integer > 1 to indicate
the size in bytes of a fixed-size chunk buffer. When no buffering argument is given, the default buffering
is max size for a single SMB2 packet (65536). This can be higher but is dependent on the credits available from
the server.
:param encoding: The name of the encoding used to decode or encode the file. This should only be used in text mode.
The default encoding is platform dependent (whatever locale.getpreferredencoding() returns), but any text
encoding types supported by Python can be used.
:param errors: Specifies how encoding encoding and decoding errors are to be handled. This cannot be used in binary
mode. A variety of standard error handlers are available, though any error handling name that has been
registered with codecs.register_error() is also valid. See the open() docs for a list of builtin error handlers
for your Python version.
:param newline: Controls how universal newlines mode works. This should only be used in text mode. It can be
'None', '', '\n', '\r', and '\r\n'.
:param share_access: String that specifies the type of access that is allowed when a handle to this file is opened
by another process. The default is 'None' which exclusively locks the file until the file is closed. The
available access values are:
'r': Allow other handles to be opened with read access.
'w': Allow other handles to be opened with write access.
'd': Allow other handles to be opened with delete access.
A combination of values can be set to allow multiple access types together.
:param desired_access: Override the access mask used when opening the file.
:param file_attributes: Set custom file attributes when opening the file.
:param file_type: The type of file to access, supports 'file' (default), 'dir', and 'pipe'.
:param kwargs: Common arguments used to build the SMB Session.
:return: The file object returned by the open() function, the type depends on the mode that was used to open the
file.
"""
file_class = {
"file": SMBFileIO,
"dir": SMBDirectoryIO,
"pipe": SMBPipeIO,
}[file_type]
raw_fd = file_class(
path,
mode=mode,
share_access=share_access,
desired_access=desired_access,
file_attributes=file_attributes,
**kwargs,
)
try:
raw_fd.open()
line_buffering = buffering == 1
if buffering == 0:
if "b" not in raw_fd.mode:
raise ValueError("can't have unbuffered text I/O")
return raw_fd
if raw_fd.readable() and raw_fd.writable():
buff_type = io.BufferedRandom
elif raw_fd.readable():
buff_type = io.BufferedReader
else:
buff_type = io.BufferedWriter
if buffering == -1:
buffering = MAX_PAYLOAD_SIZE
fd_buffer = buff_type(raw_fd, buffer_size=buffering)
if "b" in raw_fd.mode:
return fd_buffer
return io.TextIOWrapper(fd_buffer, encoding, errors, newline, line_buffering=line_buffering)
except Exception:
# If there was a failure in the setup, make sure the file is closed.
raw_fd.close()
raise
def readlink(path, **kwargs):
"""
Return a string representing the path to which the symbolic link points. If the link is relative it will be
converted to an absolute pathname relative to the link itself. The link target may point to a local path and not
another UNC path.
:param path: The path to the symbolic link to read.
:param kwargs: Common SMB Session arguments for smbclient.
:return: The link target path.
"""
norm_path = ntpath.normpath(path)
reparse_buffer = _get_reparse_point(norm_path, **kwargs)
reparse_tag = reparse_buffer["reparse_tag"]
if reparse_tag.get_value() != ReparseTags.IO_REPARSE_TAG_SYMLINK:
raise ValueError(f"Cannot read link of reparse point with tag {reparse_tag} at '{norm_path}'")
symlink_buffer = SymbolicLinkReparseDataBuffer()
symlink_buffer.unpack(reparse_buffer["data_buffer"].get_value())
return symlink_buffer.resolve_link(norm_path)
def remove(path, **kwargs):
"""
Remove (delete) the file path. If path is a directory, an IsADirectoryError is raised. Use rmdir() to remove
directories.
Trying to remove a file that is in use causes an exception to be raised unless the existing handle was opened with
the Delete share access. In that case the file will be removed once all handles are closed.
:param path: The full UNC path to the file to remove.
:param kwargs: Common SMB Session arguments for smbclient.
"""
_delete(SMBFileIO, path, **kwargs)
def removedirs(name, **kwargs):
"""
Remove directories recursively. Works like rmdir() except that, if the leaf directory is successfully removed,
removedirs() tries to successively remove every parent directory mentioned in path until an error is raised (which
is ignored, because it generally means that a parent directory is not empty).
:param name: The directory to start removing recursively from.
:param kwargs: Common SMB Session arguments for smbclient.
"""
remove_dir = ntpath.normpath(name)
while True:
try:
rmdir(remove_dir, **kwargs)
except (SMBResponseException, OSError):
return
else:
remove_dir = ntpath.dirname(remove_dir)
def rename(src, dst, **kwargs):
"""
Rename the file or directory src to dst. If dst exists, the operation will fail with an OSError subclass in a
number of cases.
:param src: The path to the file or directory to rename.
:param dst: The path to rename the file or directory to.
:param kwargs: Common SMB Session arguments for smbclient.
"""
_rename_information(src, dst, replace_if_exists=False, **kwargs)
def renames(old, new, **kwargs):
"""
Recursive directory or file renaming function. Works like rename(), except creation of any intermediate directories
needed to make the new pathname good is attempted first. After the rename, directories corresponding to rightmost
path segments of the old name will be pruned away using removedirs().
:param old: The path to the file or directory to rename.
:param new: The path to rename the file or directory to.
:param kwargs: Common SMB Session arguments for smbclient.
"""
makedirs(ntpath.dirname(new), exist_ok=True, **kwargs)
rename(old, new, **kwargs)
removedirs(ntpath.dirname(old), **kwargs)
def replace(src, dst, **kwargs):
"""
Rename the file or directory src to dst. If dst exists and is a directory, OSError will be raised. If dst exists
and is a file, it will be replaced silently if the user has permission. The path at dst must be on the same share
as the src file or folder.
:param src: The path to the file or directory to rename.
:param dst: The path to rename the file or directory to.
:param kwargs: Common SMB Session arguments for smbclient.
"""
_rename_information(src, dst, replace_if_exists=True, **kwargs)
def rmdir(path, **kwargs):
"""
Remove (delete) the directory path. If the directory does not exist or is not empty, an FileNotFoundError or an
OSError is raised respectively.
:param path: The path to the directory to remove.
:param kwargs: Common SMB Session arguments for smbclient.
"""
_delete(SMBDirectoryIO, path, **kwargs)
def scandir(path, search_pattern="*", **kwargs):
"""
Return an iterator of DirEntry objects corresponding to the entries in the directory given by path. The entries are
yielded in arbitrary order, and the special entries '.' and '..' are not included.
Using scandir() instead of listdir() can significantly increase the performance of code that also needs file type
or file attribute information, because DirEntry objects expose this information if the SMB server provides it when
scanning a directory. All DirEntry methods may perform a SMB request, but is_dir(), is_file(), is_symlink() usually
only require a one system call unless the file or directory is a reparse point which requires 2 calls. See the
Python documentation for how DirEntry is set up and the methods and attributes that are available.
:param path: The path to a directory to scan.
:param search_pattern: THe search string to match against the names of directories or files. This pattern can use
'*' as a wildcard for multiple chars and '?' as a wildcard for a single char. Does not support regex patterns.
:param kwargs: Common SMB Session arguments for smbclient.
:return: An iterator of DirEntry objects in the directory.
"""
connection_cache = kwargs.get("connection_cache", None)
with SMBDirectoryIO(path, share_access="rwd", **kwargs) as fd:
for raw_dir_info in fd.query_directory(search_pattern, FileInformationClass.FILE_ID_FULL_DIRECTORY_INFORMATION):
filename = raw_dir_info["file_name"].get_value().decode("utf-16-le")
if filename in [".", ".."]:
continue
dir_info = SMBDirEntryInformation(
creation_time=raw_dir_info["creation_time"].get_value(),
last_access_time=raw_dir_info["last_access_time"].get_value(),
last_write_time=raw_dir_info["last_write_time"].get_value(),
change_time=raw_dir_info["change_time"].get_value(),
end_of_file=raw_dir_info["end_of_file"].get_value(),
allocation_size=raw_dir_info["allocation_size"].get_value(),
file_attributes=raw_dir_info["file_attributes"].get_value(),
ea_size=raw_dir_info["ea_size"].get_value(),
file_id=raw_dir_info["file_id"].get_value(),
file_name=filename,
)
dir_entry = SMBDirEntry(
SMBRawIO(rf"{path}\{filename}", **kwargs),
dir_info,
connection_cache=connection_cache,
)
yield dir_entry
def stat(path, follow_symlinks=True, **kwargs):
"""
Get the status of a file. Perform the equivalent of a stat() system call on the given path.
This function normally follows symlinks; to stat a symlink add the argument follow_symlinks=False.
:param path: The path to the file or directory to stat.
:param follow_symlinks: Whether to open the file's reparse point if present during the open. In most scenarios
this means to stat() the symlink target if the path is a symlink or not.
:param kwargs: Common SMB Session arguments for smbclient.
:return: A tuple representing the stat result of the path. This contains the standard tuple entries as
os.stat_result as well as:
st_chgtime: The time, seconds since EPOCH, when the file's metadata was last changed.
st_atime_ns: Same as st_atime but measured in nanoseconds
st_mtime_ns: Same as st_mtime but measured in nanoseconds
st_ctime_ns: Same as st_ctime but measured in nanoseconds
st_chgtime_ns: Same as st_chgtime but measured in nanoseconds
st_file_attributes: An int representing the Windows FILE_ATTRIBUTES_* constants.
st_reparse_tag: An int representing the Windows IO_REPARSE_TAG_* constants. This is set to 0 unless
follow_symlinks=False and the path is a reparse point. See smbprotocol.reparse_point.ReparseTags.
"""
raw = SMBRawIO(
path,
mode="r",
share_access="rwd",
desired_access=FilePipePrinterAccessMask.FILE_READ_ATTRIBUTES,
create_options=0 if follow_symlinks else CreateOptions.FILE_OPEN_REPARSE_POINT,
**kwargs,
)
with SMBFileTransaction(raw) as transaction:
query_info(transaction, FileBasicInformation)
# volume_label is variable and can return up to the first 32 chars (32 * 2 for UTF-16) + null padding
query_info(transaction, FileFsVolumeInformation, output_buffer_length=88)
query_info(transaction, FileInternalInformation)
query_info(transaction, FileStandardInformation)
query_info(transaction, FileAttributeTagInformation)
basic_info, fs_volume, internal_info, standard_info, attribute_tag = transaction.results
reparse_tag = attribute_tag["reparse_tag"].get_value()
file_attributes = basic_info["file_attributes"]
st_mode = 0 # Permission bits are mostly symbolic, holdover from python stat behaviour
if file_attributes.has_flag(FileAttributes.FILE_ATTRIBUTE_DIRECTORY):
st_mode |= py_stat.S_IFDIR | 0o111
else:
st_mode |= py_stat.S_IFREG
if file_attributes.has_flag(FileAttributes.FILE_ATTRIBUTE_READONLY):
st_mode |= 0o444
else:
st_mode |= 0o666
if reparse_tag == ReparseTags.IO_REPARSE_TAG_SYMLINK:
# Python behaviour is to remove the S_IFDIR and S_IFREG is the file is a symbolic link. It also only sets
# S_IFLNK for symbolic links and not other reparse point tags like junction points.
st_mode ^= py_stat.S_IFMT(st_mode)
st_mode |= py_stat.S_IFLNK
# The time fields are 100s of nanoseconds since 1601-01-01 UTC and we need to convert to nanoseconds since EPOCH.
epoch_ft = DateTimeField.EPOCH_FILETIME
atime_ns = (basic_info["last_access_time"].get_value() - epoch_ft) * 100
mtime_ns = (basic_info["last_write_time"].get_value() - epoch_ft) * 100
ctime_ns = (basic_info["creation_time"].get_value() - epoch_ft) * 100
chgtime_ns = (basic_info["change_time"].get_value() - epoch_ft) * 100
return SMBStatResult(
st_mode=st_mode,
st_ino=internal_info["index_number"].get_value(),
st_dev=fs_volume["volume_serial_number"].get_value(),
st_nlink=standard_info["number_of_links"].get_value(),
st_uid=0,
st_gid=0,
st_size=standard_info["end_of_file"].get_value(),
st_atime=atime_ns / 1000000000,
st_mtime=mtime_ns / 1000000000,
st_ctime=ctime_ns / 1000000000,
st_chgtime=chgtime_ns / 1000000000,
st_atime_ns=atime_ns,
st_mtime_ns=mtime_ns,
st_ctime_ns=ctime_ns,
st_chgtime_ns=chgtime_ns,
st_file_attributes=file_attributes.get_value(),
st_reparse_tag=reparse_tag,
)
def stat_volume(path, **kwargs):
"""
Get stat of a volume. Currently the volume size information is returned.
:param path: The path to the file or directory on a file system volume to stat.
:param kwargs: Common SMB Session arguments for smbclient.
:return: A tuple representing the full size result:
total_size: Total size of the file system
caller_available_size: Available size for the logged user of the file system
actual_available_size: Available size of the file system
"""
raw = SMBRawIO(
path, mode="r", share_access="rwd", desired_access=FilePipePrinterAccessMask.FILE_READ_ATTRIBUTES, **kwargs
)
with SMBFileTransaction(raw) as transaction:
query_info(transaction, FileFsFullSizeInformation)
full_size = transaction.results[0]
unit_in_bytes = full_size["sectors_per_unit"].get_value() * full_size["bytes_per_sector"].get_value()
return SMBStatVolumeResult(
total_size=full_size["total_allocation_units"].get_value() * unit_in_bytes,
caller_available_size=full_size["caller_available_units"].get_value() * unit_in_bytes,
actual_available_size=full_size["actual_available_units"].get_value() * unit_in_bytes,
)
def symlink(src, dst, target_is_directory=False, **kwargs):
"""
Create a symbolic link pointing to src named dst. The src argument must be an absolute path in the same share as
src.
If the target src exists, then the symlink type is created based on the target type. If the target does not exist
then the target_is_directory var can be used to control the type of symlink created.
Note the server must support creating a reparse point using the FSCTL_SET_REPARSE_POINT code. This is typically
only Windows servers.
:param src: The target of the symlink.
:param dst: The path where the symlink is to be created.
:param target_is_directory: If src does not exist, controls whether a file or directory symlink is created.
:param kwargs: Common SMB Session arguments for smbclient.
"""
norm_dst = ntpath.normpath(dst)
if not is_remote_path(norm_dst):
raise ValueError("The link dst must be an absolute UNC path for where the link is to be created")
norm_src = ntpath.normpath(src)
print_name = norm_src
if not is_remote_path(norm_src):
flags = SymbolicLinkFlags.SYMLINK_FLAG_RELATIVE
substitute_name = norm_src
dst_dir = ntpath.dirname(norm_dst)
norm_src = ntpath.abspath(ntpath.join(dst_dir, norm_src))
else:
flags = SymbolicLinkFlags.SYMLINK_FLAG_ABSOLUTE
substitute_name = "\\??\\UNC\\" + norm_src[2:]
src_drive = ntpath.splitdrive(norm_src)[0]
dst_drive = ntpath.splitdrive(norm_dst)[0]
if src_drive.lower() != dst_drive.lower():
raise ValueError(f"Resolved link src root '{src_drive}' must be the same as the dst root '{dst_drive}'")
try:
src_stat = stat(norm_src, **kwargs)
except OSError as err:
if err.errno != errno.ENOENT:
raise
else:
# If the src actually exists, override the target_is_directory with whatever type src actually is.
target_is_directory = py_stat.S_ISDIR(src_stat.st_mode)
symlink_buffer = SymbolicLinkReparseDataBuffer()
symlink_buffer["flags"] = flags
symlink_buffer.set_name(substitute_name, print_name)
reparse_buffer = ReparseDataBuffer()
reparse_buffer["reparse_tag"] = ReparseTags.IO_REPARSE_TAG_SYMLINK
reparse_buffer["data_buffer"] = symlink_buffer
co = CreateOptions.FILE_OPEN_REPARSE_POINT
if target_is_directory:
co |= CreateOptions.FILE_DIRECTORY_FILE
else:
co |= CreateOptions.FILE_NON_DIRECTORY_FILE
raw = SMBRawIO(
norm_dst, mode="x", desired_access=FilePipePrinterAccessMask.FILE_WRITE_ATTRIBUTES, create_options=co, **kwargs
)
with SMBFileTransaction(raw) as transaction:
ioctl_request(
transaction,
CtlCode.FSCTL_SET_REPARSE_POINT,
flags=IOCTLFlags.SMB2_0_IOCTL_IS_FSCTL,
input_buffer=reparse_buffer,
)
def truncate(path, length, **kwargs):
"""
Truncate the file corresponding to path, so that it is at most length bytes in size.
:param path: The path for the file to truncate.
:param length: The length in bytes to truncate the file to.
:param kwargs: Common SMB Session arguments for smbclient.
"""
with open_file(path, mode="ab", **kwargs) as fd:
fd.truncate(length)
def unlink(path, **kwargs):
"""
Remove (delete) the file path. This function is semantically identical to remove(); the unlink name is its
traditional Unix name. Please see the documentation for remove() for further information.
:param path: The full UNC path to the file to remove.
:param kwargs: Common SMB Session arguments for smbclient.
"""
remove(path, **kwargs)
def utime(path, times=None, ns=None, follow_symlinks=True, **kwargs):
"""
Set the access and modified times of the file specified by path.
utime() takes two optional parameters, times and ns. These specify the times set on path and are used as follows:
* If ns is specified, it must be a 2-tuple of the form (atime_ns, mtime_ns) where each member is an int
expressing nanoseconds. Note SMB has a precision of 100's of nanoseconds.
* If times is not None, it must be a 2-tuple of the form (atime, mtime) where each member is an int or float
expressing seconds.
* If times and ns is None, this is equivalent to specifying ns=(atime_ns, mtime_ns) where both times are the
current time.
It is an error to specify tuples for both times and ns.
:param path: The full UNC path to the file or directory to update the time.
:param times: A 2-tuple of the form (atime, mtime)
:param ns: A 2-tuple of the form (atime_ns, mtime_ns)
:param follow_symlinks: Whether to follow symlinks when opening path.
:param kwargs: Common SMB Session arguments for smbclient.
"""
if times and ns:
raise ValueError("Both times and ns have been set for utime.")
elif times or ns:
if times:
time_tuple = times
# seconds in 100s of nanoseonds
op = operator.mul
op_amt = 10000000
else:
time_tuple = ns
# nanoseconds in 100s of nanoseconds
op = operator.floordiv
op_amt = 100
if len(time_tuple) != 2:
raise ValueError("The time tuple should be a 2-tuple of the form (atime, mtime).")
# EPOCH_FILETIME is EPOCH represented as MS FILETIME (100s of nanoseconds since 1601-01-01
atime, mtime = tuple([op(t, op_amt) + DateTimeField.EPOCH_FILETIME for t in time_tuple])
else:
# time_ns() was only added in Python 3.7
time_ns = getattr(time, "time_ns", None)
if not time_ns:
def time_ns(): # pragma: no cover
return int(time.time()) * 1000000000
atime = mtime = (time_ns() // 100) + DateTimeField.EPOCH_FILETIME
_set_basic_information(
path, last_access_time=atime, last_write_time=mtime, follow_symlinks=follow_symlinks, **kwargs
)
def walk(top, topdown=True, onerror=None, follow_symlinks=False, **kwargs):
"""
Generate the file names in a directory tree by walking the tree either top-down or bottom-up. For each directory
in the tree rooted at directory top (including top itself), it yields a 3-tuple (dirpath, dirnames, filenames).
dirpath is a string, the path to the directory, dirnames is a list of names of the subdirectories in dirpath
(excluding '.' and '..''). filenames is a list of names of the non-directory files in dirpath. Note that the names
in the lists contain no path components. To get a full path (which beings with top) to a file or directory in
dirpath, do ntpath.join(dirpath, name).
If optional argument topdown is True or not specified, the triple for a directory is generated before the triples
for any of its subdirectories (directories are generated top-down). If topdown is False, the triple for a directory
is generated after the triples for all of its subdirectories (directories are generated bottom-up). No matter the
value of topdown, the list of subdirectories is retrieved before the tuples for the directory and its
subdirectories are generated.
When topdown is True, the caller can modify the dirnames list in-place (perhaps using del or slice assignment) and
walk() will only recurse into the subdirectories whose names remain in dirnames; this can be used to prune the
search, impose a specific order of visting, or even to inform walk() about directories the caller creates or
renames before it resumes walk() again. Modifying dirnames when topdown is False has no effect on the behaviour of
the walk, because in bottom-up mode the directories in dirnames are generated before dirpath itself is generated.
By default, errors from scandir() call are ignored. If optional argument onerror is specified, it should be a