Dav*_*bii 2 python apache-spark
我在将多个列从分类值转换为数值时遇到问题。我正在使用 PySpark,但我确信问题不在于我使用的 Spark 版本。使用一列时没有问题,但在转换多列时遇到问题。这是代码,并且没有缺失值:
\n\nfrom pyspark.ml import Pipeline\nfrom pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler\ncategorical_columns= [\'age\',\'job\', \'marital\',\'education\', \'default\', \'housing\', \'loan\', \'poutcome\', \'y\']\n\nindexers = [\n StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))\n for c in categorical_columns\n]\n\nencoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),\n outputCol="{0}_encoded".format(indexer.getOutputCol())) \n for indexer in indexers\n]\n\n# Vectorizing encoded values\nassembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")\n\npipeline = Pipeline(stages=indexers + encoders+[assembler])\nmodel=pipeline.fit(df2)\ntransformed = model.transform(df2)\ntransformed.show(5)\nRun Code Online (Sandbox Code Playgroud)\n\n输出是:
\n\n---------------------------------------------------------------------------\nPy4JJavaError Traceback (most recent call last)\n<ipython-input-48-452b475faf1a> in <module>\n 20 \n 21 pipeline = Pipeline(stages=indexers + encoders+[assembler])\n---> 22 model=pipeline.fit(df2)\n 23 transformed = model.transform(df2)\n 24 transformed.show(5)\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\pyspark\\ml\\base.py in fit(self, dataset, params)\n 130 return self.copy(params)._fit(dataset)\n 131 else:\n--> 132 return self._fit(dataset)\n 133 else:\n 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\pyspark\\ml\\pipeline.py in _fit(self, dataset)\n 107 dataset = stage.transform(dataset)\n 108 else: # must be an Estimator\n--> 109 model = stage.fit(dataset)\n 110 transformers.append(model)\n 111 if i < indexOfLastEstimator:\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\pyspark\\ml\\base.py in fit(self, dataset, params)\n 130 return self.copy(params)._fit(dataset)\n 131 else:\n--> 132 return self._fit(dataset)\n 133 else:\n 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\pyspark\\ml\\wrapper.py in _fit(self, dataset)\n 293 \n 294 def _fit(self, dataset):\n--> 295 java_model = self._fit_java(dataset)\n 296 model = self._create_model(java_model)\n 297 return self._copyValues(model)\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\pyspark\\ml\\wrapper.py in _fit_java(self, dataset)\n 290 """\n 291 self._transfer_params_to_java()\n--> 292 return self._java_obj.fit(dataset._jdf)\n 293 \n 294 def _fit(self, dataset):\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\lib\\py4j-0.10.7-src.zip\\py4j\\java_gateway.py in __call__(self, *args)\n 1255 answer = self.gateway_client.send_command(command)\n 1256 return_value = get_return_value(\n-> 1257 answer, self.gateway_client, self.target_id, self.name)\n 1258 \n 1259 for temp_arg in temp_args:\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\pyspark\\sql\\utils.py in deco(*a, **kw)\n 61 def deco(*a, **kw):\n 62 try:\n---> 63 return f(*a, **kw)\n 64 except py4j.protocol.Py4JJavaError as e:\n 65 s = e.java_exception.toString()\n\nE:\\spark-2.4.2-bin-hadoop2.7\\python\\lib\\py4j-0.10.7-src.zip\\py4j\\protocol.py in get_return_value(answer, gateway_client, target_id, name)\n 326 raise Py4JJavaError(\n 327 "An error occurred while calling {0}{1}{2}.\\n".\n--> 328 format(target_id, ".", name), value)\n 329 else:\n 330 raise Py4JError(\n\nPy4JJavaError: An error occurred while calling o1833.fit.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 72, localhost, executor driver): java.io.FileNotFoundException: C:\\Users\\user\\AppData\\Local\\Temp\\blockmgr-11928db3-60f2-407b-b821-1338f779e3b5\\0d\\shuffle_30_0_0.data.6d622104-8179-4873-9b10-16afe2a61081 (The system cannot find the path specified)\n at java.io.FileOutputStream.open0(Native Method)\n at java.io.FileOutputStream.open(FileOutputStream.java:270)\n at java.io.FileOutputStream.<init>(FileOutputStream.java:213)\n at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)\n at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)\n at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)\n at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)\n at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699)\n at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)\n at org.apache.spark.scheduler.Task.run(Task.scala:121)\n at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)\n at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n at java.lang.Thread.run(Thread.java:748)\n\nDriver stacktrace:\n at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)\n at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)\n at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)\n at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)\n at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)\n at scala.Option.foreach(Option.scala:274)\n at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)\n at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:945)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n at org.apache.spark.rdd.RDD.collect(RDD.scala:944)\n at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:370)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:370)\n at org.apache.spark.rdd.RDD.$anonfun$countByValue$1(RDD.scala:1214)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)\n at org.apache.spark.rdd.RDD.countByValue(RDD.scala:1214)\n at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:140)\n at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:109)\n at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)\n at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n at java.lang.reflect.Method.invoke(Method.java:498)\n at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n at py4j.Gateway.invoke(Gateway.java:282)\n at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n at py4j.commands.CallCommand.execute(CallCommand.java:79)\n at py4j.GatewayConnection.run(GatewayConnection.java:238)\n at java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.FileNotFoundException: C:\\Users\\user\\AppData\\Local\\Temp\\blockmgr-11928db3-60f2-407b-b821-1338f779e3b5\\0d\\shuffle_30_0_0.data.6d622104-8179-4873-9b10-16afe2a61081 (The system cannot find the path specified)\n at java.io.FileOutputStream.open0(Native Method)\n at java.io.FileOutputStream.open(FileOutputStream.java:270)\n at java.io.FileOutputStream.<init>(FileOutputStream.java:213)\n at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)\n at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)\n at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)\n at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)\n at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:699)\n at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)\n at org.apache.spark.scheduler.Task.run(Task.scala:121)\n at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)\n at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n ... 1 more\nRun Code Online (Sandbox Code Playgroud)\n\n任何帮助将不胜感激
\n\n\xe2\x80\x8b
\n需要将多个列从分类值转换为数值的方法是对每列使用索引器和编码器,然后使用向量汇编器。在使用矢量汇编器之前,我还添加了一个最小-最大缩放器,如下所示:
stringIndexer = StringIndexer(inputCol="job", outputCol="job_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="job_index", outputCol="job_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="marital", outputCol="marital_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="marital_index", outputCol="marital_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="education", outputCol="education_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="education_index", outputCol="education_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="default", outputCol="default_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="default_index", outputCol="default_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="housing", outputCol="housing_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="housing_index", outputCol="housing_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="loan", outputCol="loan_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="loan_index", outputCol="loan_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="poutcome", outputCol="poutcome_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="poutcome_index", outputCol="poutcome_vec")
encoded = encoder.transform(indexed)
df2 = encoded
stringIndexer = StringIndexer(inputCol="y", outputCol="y_index")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)
encoder = OneHotEncoder(dropLast=False, inputCol="y_index", outputCol="y_vec")
encoded = encoder.transform(indexed)
df2 = encoded
df2.show(4)
cols = ['job', 'marital','education', 'default', 'housing', 'loan', 'poutcome', 'y']
for col in cols:
scaler = MinMaxScaler(inputCol=col+"_vec", outputCol=col+"_vec_scaled")
scalerModel = scaler.fit(df2)
scaledData = scalerModel.transform(df2)
df2 = scaledData
df2.show(4)
vecAssembler = VectorAssembler(inputCols=[ '`job_vec_scaled','marital_vec_scaled','education_vec_scaled', 'default_vec_scaled', `'housing_vec`_scaled', 'loan_vec_scaled', 'poutcome_vec_scaled'], outputCol='features')`
df3 = vecAssembler.transform(df2)
df3.show(4)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9309 次 |
| 最近记录: |