MapType 在 Spark 3.x 中导致 AnalysisException : Encoders.bean 到包含 map<String, someClass> 的对象失败,这在 Spark 2.4 中工作正常

Mar*_*han 7 java apache-spark

尝试将我的Java Spark代码从迁移2.4到 时3.x,我有一个包含MapType.

/**
 * Renvoyer le schéma du Dataset.
 * @return Schema.
 */
public StructType schemaEntreprise() {
   StructType schema = new StructType()
      .add("siren", StringType, false)
      .add("statutDiffusionUniteLegale", StringType, true)
      .add("unitePurgeeUniteLegale", StringType, true )
      .add("dateCreationEntreprise", StringType, true)
      .add("sigle", StringType, true)
     
   /* ... and other fields mostly of String, Integer, Boolean type... */
   
   // Ajouter au Dataset des entreprises la liaison avec les établissements.
   MapType mapEtablissements = new MapType(StringType,
this.datasetEtablissement.schemaEtablissement(), true);
   StructField etablissements = new StructField("etablissements",
mapEtablissements, true, Metadata.empty());
   schema.add(etablissements);
   schema.add("libelleCategorieJuridique", StringType, true);
   schema.add("partition", StringType, true);
   
   return schema;
}
Run Code Online (Sandbox Code Playgroud)

Dataset<Etablissement>与客体的企业Etablissment只有在他们的原语类型:

public StructType schemaEtablissement() {
   return new StructType()
      .add("siren", StringType, false)
      .add("nic", StringType, false)
      .add("siret", StringType, false)
      .add("statutDiffusionEtablissement", StringType, true)
      .add("dateCreationEtablissement", StringType, true)
         
      .add("trancheEffectifSalarie", StringType, true)
   [...]
Run Code Online (Sandbox Code Playgroud)
public class Etablissement extends AbstractSirene<SIRET> implements Comparable<Etablissement> {
   /** Serial ID. */
   private static final long serialVersionUID = 2451240618966775942L;
   
   /** Année et mois de création de l'établissement. */
   private String dateCreation;
   
   /** Qualité de siège ou non de l'établissement */
   private boolean siege;

   /** Enseigne 1 ou nom de l'exploitation */
   private String enseigne1;
   
   /** Enseigne 2 ou nom de l'exploitation */
   private String enseigne2;
   [...]
Run Code Online (Sandbox Code Playgroud)

Entreprise数据集在Spark 2.4 中完美运行但是当在操作中的Spark 3.0.1 中使用时,它的分析阶段以一条不清楚的消息结束:

org.apache.spark.sql.AnalysisException: *Can't extract value from lambdavariable(MapObject, StringType, true, 376)*: need struct type but got string;


编辑:我添加了关于我的问题的新信息:这不是一个spark.sql.legacy.allowHashOnMapType=true缺失的问题。添加它并不能解决它。

Spark 3尝试执行 : Encoders.bean(Entreprise.class)以创建具有此类的企业对象时会出现问题:

public class Entreprise extends AbstractSirene<SIREN> implements Comparable<Entreprise> {
   /** Serial ID. */
   private static final long serialVersionUID = 2451240618966775942L;
   
   /** Liste des établissements de l'entreprise. */
   private Map<String, Etablissement> etablissements = new HashMap<>();
   
   /** Sigle de l'entreprise */
   private String sigle;
   
   /** Nom de naissance */
   private String nomNaissance;

   [...]   
   /**
    * Renvoyer la liste des établissements de l'entreprise.
    * @return Liste des établissements.
    */
   public Map<String, Etablissement> getEtablissements() {
      return this.etablissements;
   }

   /**
    * Fixer la liste des établissements de l'entreprise.
    * @param etablissementsEntreprise Liste des établissements.
    */
   public void setEtablissements(Map<String, Etablissement> etablissementsEntreprise) {
      this.etablissements = etablissementsEntreprise;
   }

   /**
    * Renvoyer le sigle (forme réduite de la raison sociale ou de la dénomination d'une personne morale ou d'un organisme public) (SIGLE).
    * @return Sigle. 
    */
   public String getSigle() {
      return this.sigle;
   }

   /**
    * Fixer le sigle (forme réduite de la raison sociale ou de la dénomination d'une personne morale ou d'un organisme public) (SIGLE).
    * @param sigle Sigle. 
    */
   public void setSigle(String sigle) {
      this.sigle = sigle;
   }

   /**
    * Renvoyer le nom de naissance pour une personne physique (NOM).
    * @return Nom de naissance pour une personne physique.
    */
   public String getNomNaissance() {
      return this.nomNaissance;
   }

   /**
    * Fixer le nom de naissance pour une personne physique (NOM).
    * @param nom Nom de naissance pour une personne physique.
    */
   public void setNomNaissance(String nom) {
      this.nomNaissance = nom;
   }

   [...]
}
Run Code Online (Sandbox Code Playgroud)

调试显示 Scala 在这里失败了:

org.apache.spark.sql.AnalysisException: Can't extract value from lambdavariable(MapObject, StringType, true, 32): need struct type but got string;
    at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3076)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3074)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
[...]
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$2(TreeNode.scala:416)
    at scala.collection.MapLike$MappedValues.$anonfun$iterator$3(MapLike.scala:257)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:331)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
    at scala.collection.TraversableViewLike.force(TraversableViewLike.scala:91)
    at scala.collection.TraversableViewLike.force$(TraversableViewLike.scala:89)
    at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:331)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:424)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10.applyOrElse(Analyzer.scala:3074)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10.applyOrElse(Analyzer.scala:3070)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChild$2(TreeNode.scala:368)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$4(TreeNode.scala:427)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:427)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$2(TreeNode.scala:416)
    at scala.collection.MapLike$MappedValues.$anonfun$iterator$3(MapLike.scala:257)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:331)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
    at scala.collection.TraversableViewLike.force(TraversableViewLike.scala:91)
    at scala.collection.TraversableViewLike.force$(TraversableViewLike.scala:89)
    at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:331)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:424)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
[...]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:349)
    at org.apache.spark.sql.Dataset.resolvedEnc$lzycompute(Dataset.scala:252)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolvedEnc(Dataset.scala:251)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:83)
    at org.apache.spark.sql.Dataset.as(Dataset.scala:475)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.toDatasetEntreprise(EntrepriseDataset.java:320)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.dsEntreprises(EntrepriseDataset.java:307)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.collectEntreprisesEtEtablissements(EntrepriseDataset.java:366)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDatasetIT.entreprisesEtEtablissementsDeDouarnenez(EntrepriseDatasetIT.java:189)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)[...]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Run Code Online (Sandbox Code Playgroud)

org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala就是这一个,但我没有斯卡拉的知识,我不知道是什么期待:

在此处输入图片说明


在一切正常的情况下(=> in Spark 2.4.7),下面的单元测试给出了他旁边的结果:

org.apache.spark.sql.AnalysisException: Can't extract value from lambdavariable(MapObject, StringType, true, 32): need struct type but got string;
    at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:73)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3076)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10$$anonfun$applyOrElse$172.applyOrElse(Analyzer.scala:3074)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:333)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:333)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
[...]
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$2(TreeNode.scala:416)
    at scala.collection.MapLike$MappedValues.$anonfun$iterator$3(MapLike.scala:257)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:331)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
    at scala.collection.TraversableViewLike.force(TraversableViewLike.scala:91)
    at scala.collection.TraversableViewLike.force$(TraversableViewLike.scala:89)
    at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:331)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:424)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10.applyOrElse(Analyzer.scala:3074)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer$$anonfun$apply$31$$anonfun$applyOrElse$170$$anonfun$10.applyOrElse(Analyzer.scala:3070)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChild$2(TreeNode.scala:368)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$4(TreeNode.scala:427)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:427)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$2(TreeNode.scala:416)
    at scala.collection.MapLike$MappedValues.$anonfun$iterator$3(MapLike.scala:257)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.IterableLike$$anon$1.foreach(IterableLike.scala:331)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
    at scala.collection.TraversableViewLike.force(TraversableViewLike.scala:91)
    at scala.collection.TraversableViewLike.force$(TraversableViewLike.scala:89)
    at scala.collection.IterableLike$$anon$1.force(IterableLike.scala:331)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:424)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
[...]
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:349)
    at org.apache.spark.sql.Dataset.resolvedEnc$lzycompute(Dataset.scala:252)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolvedEnc(Dataset.scala:251)
    at org.apache.spark.sql.Dataset$.apply(Dataset.scala:83)
    at org.apache.spark.sql.Dataset.as(Dataset.scala:475)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.toDatasetEntreprise(EntrepriseDataset.java:320)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.dsEntreprises(EntrepriseDataset.java:307)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDataset.collectEntreprisesEtEtablissements(EntrepriseDataset.java:366)
    at fr.ecoemploi.spark.dataset.entreprise.EntrepriseDatasetIT.entreprisesEtEtablissementsDeDouarnenez(EntrepriseDatasetIT.java:189)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)[...]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Run Code Online (Sandbox Code Playgroud)
2287 entreprises ont été lues.
{{314551813, Activité principale : 56.30Z (NAFRev2), effectif salarié : 00 (2017, employeur : null), active : null, dernier traitement : 24 juin 2019, historisation débutée le 1 janv. 2008, nombre de périodes sans changement : 3}, nombre d'établissements : 1, catégorie entreprise : PME (2 017), catégorie juridique : 1000, n° répertoire national des associations : null, Economie Sociale et Solidaire : null, NIC de l'établissement siège : 00012, sigle : null, dénomination de l'entreprise : {18}, dénominations usuelles 1 : HOTEL BAR LA RADE, 2 :{19}, 3 : {20}, 4 : {21} , Nom de naissance : HERAUD, Nom d'usage : HASCOET, prénom usuel : MICHELINE, autres prénoms : MICHELINE, pseudonyme : null, sexe : F, purgée : null, date de création : 1 janv. 1978}
    {{31455181300012, Activité principale : 56.30Z (NAFRev2), effectif salarié : 00 (2017, employeur : null), active : null, dernier traitement : 24 juin 2019, historisation débutée le 1 janv. 2008, nombre de périodes sans changement : 3}, activité au registre des métiers : null, date de création de l'établissement : 1978-01-01, établissement siège : false, dénomination de l'établissement : null, enseigne 1 : null, 2 : null, 3 : null, adresses : {anomalies : [], annulé logiquement : false, distribution spéciale : null, numéro dans la voie : 31, répétition : null, type de voie : QUAI, libellé de voie : DU GRAND PORT, complément d'adresse : null, code postal : 29100, cedex : null - 

Mar*_*han 1

在另一个专门针对某一点的问题的帮助下,我发现我的麻烦原因是此处未显示的一行,仅影响对象的构造Entreprise

Dataset<Row> ds = ...
   .withColumn("etablissements", lit(null).cast("map<string,string>"))
Run Code Online (Sandbox Code Playgroud)

并导致失败dataset.as(Encoders.bean(Entreprise.class))Spark 2.x在转换时没有检查值的类型,但开始在 中执行此操作3.x,并且似乎我为该转换声明的值有错误的类型。

我的map<string,string>应该是 a map<string,Etablissement>。但它不能完全这样写:

解决方案是:

StructType etablissementType = Encoders.bean(Etablissement.class).schema();

Dataset<Row> ds = ...
   .withColumn("etablissements", lit(null)
      .cast(DataTypes.createMapType(StringType, etablissementType)))
Run Code Online (Sandbox Code Playgroud)