Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into skip_transform
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Apr 2, 2021
2 parents b6b5304 + 8724f2b commit b98c15c
Show file tree
Hide file tree
Showing 336 changed files with 21,367 additions and 1,367 deletions.
94 changes: 93 additions & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ jobs:
run: |
apt-get install -y libcurl4-openssl-dev libgit2-dev libssl-dev libxml2-dev
Rscript -e "install.packages(c('devtools'), repos='https://cloud.r-project.org/')"
Rscript -e "devtools::install_github('jimhester/[email protected].0')"
Rscript -e "devtools::install_github('jimhester/[email protected].1')"
./R/install-dev.sh
- name: Install dependencies for documentation generation
run: |
Expand Down Expand Up @@ -367,6 +367,17 @@ jobs:
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-
- name: Cache Maven local repository
uses: actions/cache@v2
with:
Expand All @@ -392,6 +403,17 @@ jobs:
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-
- name: Cache Coursier local repository
uses: actions/cache@v2
with:
Expand All @@ -414,6 +436,17 @@ jobs:
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-
- name: Cache Coursier local repository
uses: actions/cache@v2
with:
Expand All @@ -428,3 +461,62 @@ jobs:
- name: Build with SBT
run: |
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Phadoop-2.7 compile test:compile
tpcds-1g:
name: Run TPC-DS queries with SF=1
runs-on: ubuntu-20.04
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
- name: Cache TPC-DS generated data
id: cache-tpcds-sf-1
uses: actions/cache@v2
with:
path: ./tpcds-sf-1
key: tpcds-${{ hashFiles('tpcds-sf-1/.spark-tpcds-sf-1.md5') }}
restore-keys: |
tpcds-
- name: Checkout TPC-DS (SF=1) generated data repository
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
uses: actions/checkout@v2
with:
repository: maropu/spark-tpcds-sf-1
ref: 6b660a53091bd6d23cbe58b0f09aae08e71cc667
path: ./tpcds-sf-1
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
path: |
build/apache-maven-*
build/scala-*
build/*.jar
~/.sbt
key: build-${{ hashFiles('**/pom.xml', 'project/build.properties', 'build/mvn', 'build/sbt', 'build/sbt-launch-lib.bash', 'build/spark-build-info') }}
restore-keys: |
build-
- name: Cache Coursier local repository
uses: actions/cache@v2
with:
path: ~/.cache/coursier
key: tpcds-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
restore-keys: |
tpcds-coursier-
- name: Install Java 8
uses: actions/setup-java@v1
with:
java-version: 8
- name: Run TPC-DS queries
run: |
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite"
- name: Upload test results to report
if: always()
uses: actions/upload-artifact@v2
with:
name: test-results-tpcds--8-hadoop3.2-hive2.3
path: "**/target/test-reports/*.xml"
- name: Upload unit tests log files
if: failure()
uses: actions/upload-artifact@v2
with:
name: unit-tests-log-tpcds--8-hadoop3.2-hive2.3
path: "**/target/unit-tests.log"
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (hasInFlightRequests) {
String address = getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
"requests. Assuming connection is dead; please adjust" +
" spark.{}.io.connectionTimeout if this is wrong.",
address, requestTimeoutNs / 1000 / 1000, transportContext.getConf().getModuleName());
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -75,6 +76,20 @@
* is because an application running on the same Yarn cluster may choose to not use the external
* shuffle service, in which case its setting of `spark.authenticate` should be independent of
* the service's.
*
* The shuffle service will produce metrics via the YARN NodeManager's {@code metrics2} system
* under a namespace specified by the {@value SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY} config.
*
* By default, all configurations for the shuffle service will be taken directly from the
* Hadoop {@link Configuration} passed by the YARN NodeManager. It is also possible to configure
* the shuffle service by placing a resource named
* {@value SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME} into the classpath, which should be an
* XML file in the standard Hadoop Configuration resource format. Note that when the shuffle
* service is loaded in the default manner, without configuring
* {@code yarn.nodemanager.aux-services.<service>.classpath}, this file must be on the classpath
* of the NodeManager itself. When using the {@code classpath} configuration, it can be present
* either on the NodeManager's classpath, or specified in the classpath configuration.
* This {@code classpath} configuration is only supported on YARN versions >= 2.9.0.
*/
public class YarnShuffleService extends AuxiliaryService {
private static final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
Expand All @@ -83,6 +98,14 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;

/**
* The namespace to use for the metrics record which will contain all metrics produced by the
* shuffle service.
*/
static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
"spark.yarn.shuffle.service.metrics.namespace";
private static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService";

// Whether the shuffle server should authenticate fetch requests
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
Expand All @@ -103,6 +126,13 @@ public class YarnShuffleService extends AuxiliaryService {
private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider
.StoreVersion(1, 0);

/**
* The name of the resource to search for on the classpath to find a shuffle service-specific
* configuration overlay. If found, this will be parsed as a standard Hadoop
* {@link Configuration config} file and will override the configs passed from the NodeManager.
*/
static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = "spark-shuffle-site.xml";

// just for integration tests that want to look at this file -- in general not sensible as
// a static
@VisibleForTesting
Expand Down Expand Up @@ -139,6 +169,13 @@ public class YarnShuffleService extends AuxiliaryService {
private DB db;

public YarnShuffleService() {
// The name of the auxiliary service configured within the NodeManager
// (`yarn.nodemanager.aux-services`) is treated as the source-of-truth, so this one can be
// arbitrary. The NodeManager will log a warning if the configured name doesn't match this name,
// to inform operators of a potential misconfiguration, but this name is otherwise not used.
// It is hard-coded instead of using the value of the `spark.shuffle.service.name` configuration
// because at this point in instantiation there is no Configuration object; it is not passed
// until `serviceInit` is called, at which point it's too late to adjust the name.
super("spark_shuffle");
logger.info("Initializing YARN shuffle service for Spark");
instance = this;
Expand All @@ -157,10 +194,18 @@ private boolean isAuthenticationEnabled() {
* Start the shuffle server with the given configuration.
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
_conf = conf;
protected void serviceInit(Configuration externalConf) throws Exception {
_conf = new Configuration(externalConf);
URL confOverlayUrl = Thread.currentThread().getContextClassLoader()
.getResource(SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME);
if (confOverlayUrl != null) {
logger.info("Initializing Spark YARN shuffle service with configuration overlay from {}",
confOverlayUrl);
_conf.addResource(confOverlayUrl);
}
super.serviceInit(_conf);

boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);
boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

try {
// In case this NM was killed while there were running spark applications, we need to restore
Expand All @@ -172,7 +217,7 @@ protected void serviceInit(Configuration conf) throws Exception {
registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
}

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf));
MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance(
transportConf);
blockHandler = new ExternalBlockHandler(
Expand All @@ -181,7 +226,7 @@ protected void serviceInit(Configuration conf) throws Exception {
// If authentication is enabled, set up the shuffle server to use a
// special RPC handler that filters out unauthenticated fetch requests
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
boolean authEnabled = _conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
if (authEnabled) {
secretManager = new ShuffleSecretManager();
if (_recoveryPath != null) {
Expand All @@ -190,7 +235,7 @@ protected void serviceInit(Configuration conf) throws Exception {
bootstraps.add(new AuthServerBootstrap(transportConf, secretManager));
}

int port = conf.getInt(
int port = _conf.getInt(
SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
transportContext = new TransportContext(transportConf, blockHandler, true);
shuffleServer = transportContext.createServer(port, bootstraps);
Expand All @@ -203,13 +248,16 @@ protected void serviceInit(Configuration conf) throws Exception {
blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
shuffleServer.getRegisteredConnections());
blockHandler.getAllMetrics().getMetrics().putAll(shuffleServer.getAllMetrics().getMetrics());
String metricsNamespace = _conf.get(SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY,
DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME);
YarnShuffleServiceMetrics serviceMetrics =
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
new YarnShuffleServiceMetrics(metricsNamespace, blockHandler.getAllMetrics());

MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
metricsSystem.register(
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
metricsNamespace, "Metrics on the Spark Shuffle Service", serviceMetrics);
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem using namespace '{}'",
metricsNamespace);

logger.info("Started YARN shuffle service for Spark on port {}. " +
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
*/
class YarnShuffleServiceMetrics implements MetricsSource {

private final String metricsNamespace;
private final MetricSet metricSet;

YarnShuffleServiceMetrics(MetricSet metricSet) {
YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet) {
this.metricsNamespace = metricsNamespace;
this.metricSet = metricSet;
}

Expand All @@ -46,7 +48,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
*/
@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord(metricsNamespace);

for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
}
}

private def computeProcessTree(): Set[Int] = {
// Exposed for testing
private[executor] def computeProcessTree(): Set[Int] = {
if (!isAvailable || testing) {
return Set()
}
Expand Down Expand Up @@ -159,7 +160,8 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
}
}

def addProcfsMetricsFromOneProcess(
// Exposed for testing
private[executor] def addProcfsMetricsFromOneProcess(
allMetrics: ProcfsMetrics,
pid: Int): ProcfsMetrics = {

Expand Down Expand Up @@ -199,7 +201,7 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
case f: IOException =>
logWarning("There was a problem with reading" +
" the stat file of the process. ", f)
ProcfsMetrics(0, 0, 0, 0, 0, 0)
throw f
}
}

Expand All @@ -210,11 +212,16 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
val pids = computeProcessTree
var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0)
for (p <- pids) {
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
// if we had an error getting any of the metrics, we don't want to report partial metrics, as
// that would be misleading.
if (!isAvailable) {
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
try {
allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
// if we had an error getting any of the metrics, we don't want to
// report partial metrics, as that would be misleading.
if (!isAvailable) {
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
}
} catch {
case _: IOException =>
return ProcfsMetrics(0, 0, 0, 0, 0, 0)
}
}
allMetrics
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,16 @@ package object config {
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").version("1.2.0").intConf.createWithDefault(7337)

private[spark] val SHUFFLE_SERVICE_NAME =
ConfigBuilder("spark.shuffle.service.name")
.doc("The configured name of the Spark shuffle service the client should communicate with. " +
"This must match the name used to configure the Shuffle within the YARN NodeManager " +
"configuration (`yarn.nodemanager.aux-services`). Only takes effect when " +
s"$SHUFFLE_SERVICE_ENABLED is set to true.")
.version("3.2.0")
.stringConf
.createWithDefault("spark_shuffle")

private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab")
.doc("Location of user's keytab.")
.version("3.0.0")
Expand Down
Loading

0 comments on commit b98c15c

Please sign in to comment.