nav*_*ore 16 scala apache-spark
使用Spark 1.5和Scala 2.10.6
我正在尝试通过字段"tags"过滤数据帧,这是一个字符串数组.查找标记为"private"的所有行.
val report = df.select("*")
.where(df("tags").contains("private"))
Run Code Online (Sandbox Code Playgroud)
得到:
线程"main"中的异常org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析'Contains(tags,private)':参数1需要字符串类型,但是'tags'是数组类型.
过滤方法更适合吗?
更新:
数据来自cassandra适配器,但是一个显示我正在尝试做的最小例子,并且还得到上述错误:
def testData (sc: SparkContext): DataFrame = {
val stringRDD = sc.parallelize(Seq("""
{ "name": "ed",
"tags": ["red", "private"]
}""",
"""{ "name": "fred",
"tags": ["public", "blue"]
}""")
)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
sqlContext.read.json(stringRDD)
}
def run(sc: SparkContext) {
val df1 = testData(sc)
df1.show()
val report = df1.select("*")
.where(df1("tags").contains("private"))
report.show()
}
Run Code Online (Sandbox Code Playgroud)
更新:标签数组可以是任意长度,'私人'标签可以在任何位置
更新:一个有效的解决方案:UDF
val filterPriv = udf {(tags: mutable.WrappedArray[String]) => tags.contains("private")}
val report = df1.filter(filterPriv(df1("tags")))
Run Code Online (Sandbox Code Playgroud)
Rob*_*ier 27
我想如果你使用where(array_contains(...))它会起作用.这是我的结果:
scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
scala> def testData (sc: SparkContext): DataFrame = {
| val stringRDD = sc.parallelize(Seq
| ("""{ "name": "ned", "tags": ["blue", "big", "private"] }""",
| """{ "name": "albert", "tags": ["private", "lumpy"] }""",
| """{ "name": "zed", "tags": ["big", "private", "square"] }""",
| """{ "name": "jed", "tags": ["green", "small", "round"] }""",
| """{ "name": "ed", "tags": ["red", "private"] }""",
| """{ "name": "fred", "tags": ["public", "blue"] }"""))
| val sqlContext = new org.apache.spark.sql.SQLContext(sc)
| import sqlContext.implicits._
| sqlContext.read.json(stringRDD)
| }
testData: (sc: org.apache.spark.SparkContext)org.apache.spark.sql.DataFrame
scala>
| val df = testData (sc)
df: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]
scala> val report = df.select ("*").where (array_contains (df("tags"), "private"))
report: org.apache.spark.sql.DataFrame = [name: string, tags: array<string>]
scala> report.show
+------+--------------------+
| name| tags|
+------+--------------------+
| ned|[blue, big, private]|
|albert| [private, lumpy]|
| zed|[big, private, sq...|
| ed| [red, private]|
+------+--------------------+
Run Code Online (Sandbox Code Playgroud)
请注意,如果你写where(array_contains(df("tags"), "private")),它可以工作,但如果你写where(df("tags").array_contains("private"))(更直接类似于你最初写的)它失败了array_contains is not a member of org.apache.spark.sql.Column.看一下源代码Column,我看到有一些东西要处理contains(构建一个Contains实例)但不是array_contains.也许这是一个疏忽.
| 归档时间: |
|
| 查看次数: |
32949 次 |
| 最近记录: |