nir*_*jan 4 scala apache-spark
我是斯卡拉新手。
我的要求是我需要逐行读取并将其拆分为特定的分隔符并提取值以放入不同文件中的相应列中。
以下是我的输入示例数据:
ABC Log
Aug 10 14:36:52 127.0.0.1 CEF:0|McAfee|ePolicy Orchestrator|IFSSLCRT0.5.0.5/epo4.0|2410|DeploymentTask|High eventId=34 externalId=23
Aug 10 15:45:56 127.0.0.1 CEF:0|McAfee|ePolicy Orchestrator|IFSSLCRT0.5.0.5/epo4.0|2890|DeploymentTask|Medium eventId=888 externalId=7788
Aug 10 16:40:59 127.0.0.1 CEF:0|NV|ePolicy Orchestrator|IFSSLCRT0.5.0.5/epo4.0|2990|DeploymentTask|Low eventId=989 externalId=0004
XYZ Log
Aug 15 14:32:15 142.101.36.118 cef[10612]: CEF:0|fire|cc|3.5.1|FireEye Acquisition Started
Aug 16 16:45:10 142.101.36.189 cef[10612]: CEF:0|cold|dd|3.5.4|FireEye Acquisition Started
Aug 18 19:50:20 142.101.36.190 cef[10612]: CEF:0|fire|ee|3.5.6|FireEye Acquisition Started
Run Code Online (Sandbox Code Playgroud)
在上面的数据中,我需要读取“ABC log”标题下的第一部分,并从每一行中提取值并将其放在相应的列下。这里,几个第一个值列名称是硬编码的,我需要通过拆分“=”来提取最后一列,即eventId=34 externalId=23 => col = eventId 值 = 34 且 col = 值 = externalId
Column names
date time ip_address col1 col2 col3 col4 col5
Run Code Online (Sandbox Code Playgroud)
我想要如下输出:
这是第一部分“ABC Log”,并将其放入一个文件中,其余部分相同。
date time ip_address col1 col2 col3 col4 col5 col6 col7
Aug 10 14:36:52 127.0.0.1 CEF:0 McAfee ePolicy Orchestrator IFSSLCRT0.5.0.5/epo4.0 2410 DeploymentTask High
Aug 10 15:45:56 127.0.0.1 CEF:0 McAfee ePolicy Orchestrator IFSSLCRT0.5.0.5/epo4.0 2890 DeploymentTask Medium
Run Code Online (Sandbox Code Playgroud)
下面的代码我一直在尝试:
package AV_POC_Parsing
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
// For implicit conversions like converting RDDs to DataFrames
//import org.apache.spark.implicits._
//import spark.implicits._
object scala {
def main(args: Array[String]) {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("AV_Log_Processing").setMaster("local[*]"))
// Read text file in spark RDD
val textFile = sc.textFile("input.txt");
val splitRdd = textFile.map( line => line.split(" "))
// RDD[ Array[ String ]
// printing values
splitRdd.foreach { x => x.foreach { y => println(y) } }
// how to store split values in different column and write it into file
}}
Run Code Online (Sandbox Code Playgroud)
如何在 Scala 中分割两个分隔符。
谢谢
也许它对你有帮助。
import org.apache.spark.{SparkConf, SparkContext}
object DataFilter {
def main(args: Array[String]): Unit = {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("AV_Log_Processing").setMaster("local[*]"))
// Read text file in spark RDD
val textFile = sc.textFile("input.txt");
val splitRdd = textFile.map { s =>
val a = s.split("[ |]")
val date = Array(a(0) + " " + a(1))
(date ++ a.takeRight(10)).mkString("\t")
}
// RDD[ Array[ String ]
// printing values
splitRdd.foreach(println)
// how to store split values in different column and write it into file
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
26805 次 |
| 最近记录: |