@@ -20,8 +20,8 @@ package org.apache.spark.storage
20
20
import java .text .SimpleDateFormat
21
21
import java .util .{Date , Random }
22
22
23
- import tachyon .client . TachyonFS
24
- import tachyon .client .TachyonFile
23
+ import tachyon .TachyonURI
24
+ import tachyon .client .{ TachyonFile , TachyonFS }
25
25
26
26
import org .apache .spark .Logging
27
27
import org .apache .spark .executor .ExecutorExitCode
@@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
40
40
val master : String )
41
41
extends Logging {
42
42
43
- val client = if (master != null && master != " " ) TachyonFS .get(master) else null
43
+ val client = if (master != null && master != " " ) TachyonFS .get(new TachyonURI ( master) ) else null
44
44
45
45
if (client == null ) {
46
46
logError(" Failed to connect to the Tachyon as the master address is not configured" )
@@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
60
60
addShutdownHook()
61
61
62
62
def removeFile (file : TachyonFile ): Boolean = {
63
- client.delete(file.getPath(), false )
63
+ client.delete(new TachyonURI ( file.getPath() ), false )
64
64
}
65
65
66
66
def fileExists (file : TachyonFile ): Boolean = {
67
- client.exist(file.getPath())
67
+ client.exist(new TachyonURI ( file.getPath() ))
68
68
}
69
69
70
70
def getFile (filename : String ): TachyonFile = {
@@ -81,15 +81,15 @@ private[spark] class TachyonBlockManager(
81
81
if (old != null ) {
82
82
old
83
83
} else {
84
- val path = tachyonDirs(dirId) + " / " + " %02x" .format(subDirId)
84
+ val path = new TachyonURI ( s " ${ tachyonDirs(dirId)} / ${ " %02x" .format(subDirId)} " )
85
85
client.mkdir(path)
86
86
val newDir = client.getFile(path)
87
87
subDirs(dirId)(subDirId) = newDir
88
88
newDir
89
89
}
90
90
}
91
91
}
92
- val filePath = subDir + " / " + filename
92
+ val filePath = new TachyonURI ( s " $subDir / $ filename" )
93
93
if (! client.exist(filePath)) {
94
94
client.createFile(filePath)
95
95
}
@@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(
101
101
102
102
// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
103
103
private def createTachyonDirs (): Array [TachyonFile ] = {
104
- logDebug(" Creating tachyon directories at root dirs '" + rootDirs + " '" )
104
+ logDebug(s " Creating tachyon directories at root dirs ' $ rootDirs' " )
105
105
val dateFormat = new SimpleDateFormat (" yyyyMMddHHmmss" )
106
106
rootDirs.split(" ," ).map { rootDir =>
107
107
var foundLocalDir = false
@@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager(
113
113
tries += 1
114
114
try {
115
115
tachyonDirId = " %s-%04x" .format(dateFormat.format(new Date ), rand.nextInt(65536 ))
116
- val path = rootDir + " / " + " spark-tachyon-" + tachyonDirId
116
+ val path = new TachyonURI ( s " $rootDir / spark-tachyon-$ tachyonDirId" )
117
117
if (! client.exist(path)) {
118
118
foundLocalDir = client.mkdir(path)
119
119
tachyonDir = client.getFile(path)
120
120
}
121
121
} catch {
122
122
case e : Exception =>
123
- logWarning(" Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed" , e)
123
+ logWarning(s " Attempt $ tries to create tachyon dir $ tachyonDir failed " , e)
124
124
}
125
125
}
126
126
if (! foundLocalDir) {
127
- logError(" Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
128
- rootDir)
127
+ logError(s " Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir" )
129
128
System .exit(ExecutorExitCode .TACHYON_STORE_FAILED_TO_CREATE_DIR )
130
129
}
131
- logInfo(" Created tachyon directory at " + tachyonDir)
130
+ logInfo(s " Created tachyon directory at $ tachyonDir" )
132
131
tachyonDir
133
132
}
134
133
}
@@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager(
145
144
}
146
145
} catch {
147
146
case e : Exception =>
148
- logError(" Exception while deleting tachyon spark dir: " + tachyonDir, e)
147
+ logError(s " Exception while deleting tachyon spark dir: $ tachyonDir" , e)
149
148
}
150
149
}
151
150
client.close()
0 commit comments