使用正则表达式进行Spark过滤

lea*_*... 3 scala apache-spark rdd

我试图按日期过滤文件数据为好的和坏的数据,因此将获得2个结果文件.从测试文件中,前4行需要输入良好数据,最后2行需要输入错误数据.

我有2个问题

  1. 我没有得到任何好的数据,结果文件是空的
  2. 坏数据结果如下所示 - 仅提取名称字符

    (,C,h)(,J,u)(,T,h)(,J,o)(,N,e)(,B,i)

测试文件

Christopher|Jan 11, 2017|5 
Justin|11 Jan, 2017|5 
Thomas|6/17/2017|5 
John|11-08-2017|5 
Neli|2016|5 
Bilu||5
Run Code Online (Sandbox Code Playgroud)

加载和RDD

scala> val file = sc.textFile("test/data.txt")
scala> val fileRDD = file.map(x => x.split("|"))
Run Code Online (Sandbox Code Playgroud)

正则表达式

scala> val singleReg = """(\w(3))\s(\d+)(,)\s(\d(4))|(\d+)\s(\w(3))(,)\s(\d(4))|(\d+)(\/)(\d+)(\/)(\d(4))|(\d+)(-)(\d+)(-)(\d(4))""".r
Run Code Online (Sandbox Code Playgroud)

开头和结尾都有三个"(双引号),这里有重要的吗?"

过滤 问题区域

scala> val validSingleRecords = fileRDD.filter(x => (singleReg.pattern.matcher(x(1)).matches))
scala> val badSingleRecords = fileRDD.filter(x => !(singleReg.pattern.matcher(x(1)).matches))
Run Code Online (Sandbox Code Playgroud)

将数组转换为字符串

scala> val validSingle = validSingleRecords.map(x => (x(0),x(1),x(2)))
scala> val badSingle = badSingleRecords.map(x => (x(0),x(1),x(2)))
Run Code Online (Sandbox Code Playgroud)

写文件

scala> validSingle.repartition(1).saveAsTextFile("data/singValid")
scala> badSingle.repartition(1).saveAsTextFile("data/singBad")
Run Code Online (Sandbox Code Playgroud)

更新1 我的上面的正则表达式是错误的,我已将其更新为.在scala中,反斜杠是一个转义字符,因此需要复制

val singleReg = """\\w{3}\\s\\d+,\\s\\d{4}|\\d+\\s\\w{3},\\s\\d{4}|\\d+\/\\d+\/\\d{4}|\\d+-\\d+-\\d{4}""".r
Run Code Online (Sandbox Code Playgroud)

检查regex101上的正则表达式,前4行中的日期通过.

我再次运行测试,我仍然得到相同的结果.

him*_*ian 5

代码有2个问题:

  1. 您用来分割线条的字符data.txt是错误的.它应该是'|'代替"|".
  2. 正则表达式singleReg是错误的.

正确的代码如下:

加载和RDD

scala> val file = sc.textFile("test/data.txt")
scala> val fileRDD = file.map(x => x.split('|'))
Run Code Online (Sandbox Code Playgroud)

正则表达式

scala> val singleReg = """\w{3}\s\d{2},\s\d{4}|\d{2}\s\w{3},\s\d{4}|\d{1}\/\d{2}\/\d{4}|\d{2}-\d{2}-\d{4}""".r
Run Code Online (Sandbox Code Playgroud)

过滤

scala> val validSingleRecords = fileRDD.filter(x => (singleReg.pattern.matcher(x(1)).matches))
scala> val badSingleRecords = fileRDD.filter(x => !(singleReg.pattern.matcher(x(1)).matches))
Run Code Online (Sandbox Code Playgroud)

将数组转换为字符串

scala> val validSingle = validSingleRecords.map(x => (x(0),x(1),x(2)))
scala> val badSingle = badSingleRecords.map(x => (x(0),x(1),x(2)))
Run Code Online (Sandbox Code Playgroud)

写文件

scala> validSingle.repartition(1).saveAsTextFile("data/singValid")
scala> badSingle.repartition(1).saveAsTextFile("data/singBad")
Run Code Online (Sandbox Code Playgroud)

上面的代码将为您提供以下输出 -

数据/ singValid

(Christopher,Jan 11, 2017,5 )
(Justin,11 Jan, 2017,5 )
(Thomas,6/17/2017,5 )
(John,11-08-2017,5 )
Run Code Online (Sandbox Code Playgroud)

数据/ singBad

(Neli,2016,5 )
(Bilu,,5)
Run Code Online (Sandbox Code Playgroud)