Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not working with regression problems #139

Closed
mostafam opened this issue Apr 10, 2019 · 13 comments
Closed

Not working with regression problems #139

mostafam opened this issue Apr 10, 2019 · 13 comments

Comments

@mostafam
Copy link

Hello and thank you for this package!

It seems that this package is not useful when the problem is regression rather than classification. After getting errors on my own data when using this package, I realized it always gives error for regression problems. As an example I slightly modified https://github.com/maxpumperla/elephas/blob/master/examples/ml_pipeline_otto.py to make it a regression problem from line 61 onwards:

model = Sequential()
model.add(Dense(512, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('linear'))

model.compile(optimizer='adam', loss='mean_absolute_error')

sgd = optimizers.SGD(lr=0.01)
sgd_conf = optimizers.serialize(sgd)

# Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_optimizer_config(sgd_conf)
estimator.set_mode("synchronous")
estimator.set_loss("mean_absolute_error")
estimator.set_metrics(['mae'])
estimator.setFeaturesCol("scaled_features")
estimator.setLabelCol("index_category")
estimator.set_epochs(10)
estimator.set_batch_size(128)
estimator.set_num_workers(1)
estimator.set_verbosity(0)
estimator.set_validation_split(0.15)
estimator.set_categorical_labels(False)
# estimator.set_nb_classes(nb_classes)

# Fitting a model returns a Transformer
pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
fitted_pipeline = pipeline.fit(train_df)

# Evaluate Spark model
prediction = fitted_pipeline.transform(train_df)
pnl = prediction.select("index_category", "prediction")
pnl.show(2)

Unfortunately it gives error:

19/04/10 05:32:34 WARN TaskSetManager: Lost task 0.0 in stage 459.0 (TID 6450, local[0], executor 103): org.apache.spark.api.python.PythonException: Traceback (most rece
nt call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 372, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1368, in verify_struct
    "length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: Length of object (7) does not match with length of fields (5)

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
       at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 372, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj)
  File "/usr/lib/spark/python/pyspark/sql/types.py", line 1368, in verify_struct
    "length of fields (%d)" % (len(obj), len(verifiers))))
ValueError: Length of object (7) does not match with length of fields (5)
@louis925
Copy link

Any update on this issue?

@ghost
Copy link

ghost commented Jul 21, 2019

I'm also experiencing same issue.

@saketrule
Copy link

same issue

@jlsouza23
Copy link

same issue too

@Great1414
Copy link

Also same issue. So the package maybe didn't fit for regression?
I want use the Autoencoder model replace the example, but it fails.

@AntonOellerer
Copy link

Hey, I had a similar problem and fixed this by passing elephas_estimator.set_optimizer_config a SGD optimizer instead of an Adam optimizer.

@leozzz620
Copy link

Hey, I had a similar problem and fixed this by passing elephas_estimator.set_optimizer_config a SGD optimizer instead of an Adam optimizer.

Hi, how do you fix it? I used your method but still have error.

@AntonOellerer
Copy link

Thats all I did, here is the code excerpt if it helps you:

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.layers import LeakyReLU
from keras.optimizers import SGD, Adam
from keras import optimizers
from elephas.ml_model import ElephasEstimator

model = Sequential()
model.add(Dense(128, input_dim=transformed_train_sample.head()['features'].size))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(64))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(32))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(16))
model.add(Activation('sigmoid'))
adam = optimizers.Adam(lr = 0.00001)
model.compile(loss='mean_squared_error', optimizer=adam)

sgd = optimizers.SGD(lr=0.01)

opt_conf = optimizers.serialize(sgd)

estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_batch_size(32)
estimator.set_frequency('batch')
estimator.set_categorical_labels(True)
estimator.set_nb_classes(16)
estimator.setLabelCol("label_index")
estimator.setFeaturesCol("features")
estimator.setOutputCol("predictions")
estimator.set_loss('mean_squared_error')
estimator.set_optimizer_config(opt_conf)

estimator_pipeline = Pipeline(stages=[estimator])
estimator_model = estimator_pipeline.fit(transformed_train_sample)
#estimator_model.save("estimator_model")

eval_predictions = estimator_model.transform(transformed_eval_sample)

@leozzz620
Copy link

Thats all I did, here is the code excerpt if it helps you:

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.layers import LeakyReLU
from keras.optimizers import SGD, Adam
from keras import optimizers
from elephas.ml_model import ElephasEstimator

model = Sequential()
model.add(Dense(128, input_dim=transformed_train_sample.head()['features'].size))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(64))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(32))
model.add(LeakyReLU(alpha=0.1))
model.add(Dense(16))
model.add(Activation('sigmoid'))
adam = optimizers.Adam(lr = 0.00001)
model.compile(loss='mean_squared_error', optimizer=adam)

sgd = optimizers.SGD(lr=0.01)

opt_conf = optimizers.serialize(sgd)

estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_batch_size(32)
estimator.set_frequency('batch')
estimator.set_categorical_labels(True)
estimator.set_nb_classes(16)
estimator.setLabelCol("label_index")
estimator.setFeaturesCol("features")
estimator.setOutputCol("predictions")
estimator.set_loss('mean_squared_error')
estimator.set_optimizer_config(opt_conf)

estimator_pipeline = Pipeline(stages=[estimator])
estimator_model = estimator_pipeline.fit(transformed_train_sample)
#estimator_model.save("estimator_model")

eval_predictions = estimator_model.transform(transformed_eval_sample)

Hi I think you are doing a classification work not regression

@Caveman08
Copy link

Caveman08 commented Aug 6, 2020

As @leozzz620 commented what you have shown so far is not a regression. Changing the optimizer wouldn't be enough for the .fit to know you are working on something other than a classification problem.

I'm trying to scour for some info on applying elephas on a regression problem, does the package even support it? The fit at the end should be applied to an x_train, y_train and validated on an x_test, y_test and that doesn't seem to be happening. Anyone have any progress? I'm not sure that Pyspark ml pipeline even supports regression.

I've made it up to the fit model stage. Without this working, I'd have to undo all the work vectorizing the features in the first place and I'm not sure pandas can handle a dataframe of the magnitude i'm trying to work with.

from keras import optimizers
from elephas.ml_model import ElephasEstimator

##Serialize optimizer
sgd_conf = optimizers.serialize(sgd)

##Initialize Elephas Spark ML Estimator
estimator = ElephasEstimator()
estimator.set_keras_model_config(model.to_yaml())
estimator.set_num_workers(1)
estimator.set_epochs(100)
estimator.set_batch_size(10)
estimator.set_validation_split(0.10)
estimator.setLabelCol("y_label")
estimator.setFeaturesCol("features")
estimator.setOutputCol("predictions")
estimator.set_loss('mean_squared_error')
estimator.set_metrics(['mse', 'mae'])
estimator.set_optimizer_config(sgd_conf)

estimator_pipeline = Pipeline(stages=[estimator])
estimator_model = estimator_pipeline.fit(X_train, Y_train, validation_data = (X_test, Y_test)) ## obv doesn't accept validation param
estimator_model.save("estimator_model")

eval_predictions = estimator_model.transform(transformed_eval_sample)

@AntonOellerer
Copy link

Sorry, I just commented since the error messages are really similar

@danielenricocahall
Copy link
Collaborator

Support for regression was introduced in 0.4.5 - please retry and let me know if you run into any issues!

@SumeyyeSuslu
Copy link

It seems to be broken with new versions. I ran regression example on Google colab and gave these results:

print(metrics.r2)
print(metrics.meanAbsoluteError)
print(metrics.rootMeanSquaredError)
-57287.893765051085
22.850576681614513
24.626286489524432

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants