Spark/Neo4j抛出错误:RuntimeException:java.util.Collections $ UnmodifiableRandomAccessList不是字符串模式的有效外部类型

Cod*_*joy 6 scala geospatial neo4j neo4j-spatial apache-spark

确切的查询:

call spatial.bbox('geom', {lat:37.5,lon:43.4}, {lat:37.6,lon:43.5}) yield node return node.altitude as altitude, node.detect_type as detect_type, node.gtype as gtype, node.toDateFormatLong as toDateFormatLong, node.change_area as change_area, node.latitude as latitude, node.longitude as longitude, node.fromDateFormatLong as fromDateFormatLong, node.iids as iids, node.detect_strength as detect_strength, node.fromDate as fromDate, node.bbox as bbox ORDER BY node.toDateFormatLong DESC
Run Code Online (Sandbox Code Playgroud)

示例数据集:

????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
?"altitude"?"detect_type"?"gtype"?"toDateFormatLong"?"change_area"?"latitude"?"longitude"?"fromDateFormatLong"?"iids"                                                               ?"detect_strength"?"fromDate"?"bbox"                                       ?
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
?-1        ?"Arrival"    ?1      ?20161104          ?16981        ?37.5608649?43.4297988 ?20161023            ?"23OCT16S1A89377_09_IW1_09_pp_1231_04NOV16S1A90776_09_123_31_TT_QQQQ”?7.2              ?"23OCT16" ?[43.4297988,37.5608649,43.4297988,37.5608649]?
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
?-1        ?"Arrival"    ?1      ?20161104          ?3123         ?37.56749  ?43.4807208 ?20161023            ?"23OCT16S1A89377_09_IW1_09_pp_1231_04NOV16S1A90776_09_124_32_TT_QQQQ"?7.5              ?"23OCT16" ?[43.4807208,37.56749,43.4807208,37.56749]    ?
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
Run Code Online (Sandbox Code Playgroud)

我打电话给你

try {
 val initialDf2 = neo.cypher(query).loadDataFrame
 val someVal = initialDf2.collectAsList()
} catch {
   case e: Exception => e.printStackTrace
}
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

 17/09/18 08:44:48 ERROR TaskSetManager: Task 0 in stage 298.0 failed 1 times; aborting job
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 298.0 failed 1 times, most recent failure: Lost task 0.0 in stage 298.0 (TID 298, localhost, executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.util.Collections$UnmodifiableRandomAccessList is not a valid external type for schema of string
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, altitude), DoubleType) AS altitude#1678
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, detect_type), StringType), true) AS detect_type#1679
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, gtype), LongType) AS gtype#1680L
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, toDateFormatLong), LongType) AS toDateFormatLong#1681L
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, change_area), LongType) AS change_area#1682L
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, latitude), DoubleType) AS latitude#1683
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, longitude), DoubleType) AS longitude#1684
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, fromDateFormatLong), LongType) AS fromDateFormatLong#1685L
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, iids), StringType), true) AS iids#1686
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, detect_strength), DoubleType) AS detect_strength#1687
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, fromDate), StringType), true) AS fromDate#1688
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, bbox), StringType), true) AS bbox#1689
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290)
        at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
Run Code Online (Sandbox Code Playgroud)

如果我不包含bbox,则会返回数据.

在neo4j浏览器中,我可以运行问题查询并返回结果:

-1  “Detected”  1   20161104    3318    37.5049815  43.4171031  20161023     “filename.val” 9.2 "23OCT16"   [43.4171031, 37.5049815, 43.4171031, 37.5049815]
Run Code Online (Sandbox Code Playgroud)

这是次要列表,我可能必须将node.bbox.somevalue1作为bbbox1返回,但不知道确切的语法是什么....

我认为这与我所拥有的问题类似......

Neo4j spark connector loadDataFrame给出错误

解决方法:

https://github.com/neo4j-contrib/neo4j-spark-connector/issues/40

看起来它似乎想从我回来的东西中得到更多.

小智 -1

loadDataFrame 您需要使用( fieldName 和 fieldtype )声明数据帧架构

像这样:

     val rawGraphnode=neo.cypher("MATCH (n:person)where (n.duration <>0) RETURN  n.user as user,n.other as other,n.direction as direction,n.duration as duration,n.timestamp as  timestamp")
            .loadDataFrame(schema = ("user","object"),("other","object"),("direction","string"),("duration","String"),("timestamp","String"))

    rawGraphnode.printSchema()
    rawGraphnode.show(10)
Run Code Online (Sandbox Code Playgroud)