小编ukb*_*baz的帖子

将文件记录到Pandas Dataframe

我有日志文件,其中有许多行以下列形式:

LogLevel    [13/10/2015 00:30:00.650]  [Message Text]
Run Code Online (Sandbox Code Playgroud)

我的目标是将日志文件中的每一行转换为一个漂亮的数据框架.我已经厌倦了这样做,通过分割[字符上的线条,但是我仍然没有得到一个整洁的数据帧.

我的代码:

level = []
time = []
text = []

   with open(filename) as inf:
     for line in inf:
       parts = line.split('[')
         if len(parts) > 1:  
           level = parts[0]
           time = parts[1]
           text = parts[2]
        print (parts[0],parts[1],parts[2])

 s1 = pd.Series({'Level':level, 'Time': time, 'Text':text})
 df = pd.DataFrame(s1).reset_index()
Run Code Online (Sandbox Code Playgroud)

继承我打印的数据框:

Info      10/08/16 10:56:09.843]   In Function CCatalinaPrinter::ItemDescription()]

Info      10/08/16 10:56:09.843]   Sending UPC Description Message ]
Run Code Online (Sandbox Code Playgroud)

如何改进这个以去除空白和另一个']'字符

谢谢

python data-analysis dataframe python-3.x pandas

6
推荐指数
1
解决办法
1万
查看次数

在集群模式下将 PySpark 应用程序提交到 YARN 上的 Spark

我正在尝试测试一个为我工作的团队构建的大数据平台。它在 YARN 上运行 Spark。

是否可以创建 PySpark 应用程序并将其提交到 YARN 集群上?

我能够成功提交示例 SparkPi jar 文件,它返回 YARN 标准输出日志中的输出。

这是我正在尝试测试的 PySpark 代码;

from pyspark import SparkConf
from pyspark import SparkContext

HDFS_MASTER = 'hadoop-master'

conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('spark-test')
sc = SparkContext(conf=conf)

distFile = sc.textFile('hdfs://{0}:9000/tmp/test/test.csv'.format(HDFS_MASTER))

nonempty_lines = distFile.filter(lambda x: len(x) > 0)
print ('Nonempty lines', nonempty_lines.count())
Run Code Online (Sandbox Code Playgroud)

我在 Spark 目录中的 CMD 中尝试的命令:

bin\spark-submit --master yarn --deploy-mode cluster --driver-memory 4g
executor-memory 2g --executor-cores 1 examples\sparktest2.py 10
Run Code Online (Sandbox Code Playgroud)

sparktest2.py我的脚本在我的 Spark 目录中的示例目录中调用。

日志(标准错误):

 application from cluster with 3 NodeManagers …
Run Code Online (Sandbox Code Playgroud)

python hadoop hadoop-yarn apache-spark pyspark

6
推荐指数
1
解决办法
2万
查看次数

使用SparkSession创建广播变量?Spark 2.0

是否可以使用SparkSession提供的sparkContext创建广播变量?我在sc.broadcast下不断收到错误,但是在使用org.apache.spark.SparkContext的SparkContext时,在另一个项目中我没有遇到任何问题.

import org.apache.spark.sql.SparkSession


object MyApp {
 def main(args: Array[String]){
  val spark = SparkSession.builder()
       .appName("My App")
       .master("local[*]")
       .getOrCreate()

  val sc = spark.sparkContext
        .setLogLevel("ERROR")

  val path = "C:\\Boxes\\github-archive\\2015-03-01-0.json"
  val ghLog = spark.read.json(path)


  val pushes = ghLog.filter("type = 'PushEvent'")

  pushes.printSchema()
  println("All events: "+ ghLog.count)
  println("Only pushes: "+pushes.count)
  pushes.show(5)


  val grouped = pushes.groupBy("actor.login").count()
  grouped.show(5)


  val ordered = grouped.orderBy(grouped("count").desc)
  ordered.show(5)

  import scala.io.Source.fromFile
  val fileName= "ghEmployees.txt"
  val employees = Set() ++ ( 
    for { 
      line <- fromFile(fileName).getLines()
    } yield line.trim
    )


  val bcEmployees = sc.broadcast(employees)
 } …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

3
推荐指数
1
解决办法
4203
查看次数

如何将 JSON 的 RDD 转换为 Dataframe?

我有一个从一些 JSON 创建的 RDD,RDD 中的每条记录都包含键/值对。我的 RDD 看起来像:

myRdd.foreach(println)
Run Code Online (Sandbox Code Playgroud)
myRdd.foreach(println)
Run Code Online (Sandbox Code Playgroud)

我会将每条记录转换为 spark 数据框中的一行, trackingInfo 中的嵌套字段应该有自己的列,type列表也应该是自己的列。

到目前为止,我已经厌倦了使用案例类来拆分它:

case class Event(
    sequence: String, 
    id: String, 
    trackingInfo:String,
    location:String, 
    row:String, 
    trackId: String, 
    listrequestId: String, 
    videoId:String, 
    rank: String, 
    requestId: String, 
    `type`:String, 
    time: String)

val dataframeRdd = myRdd.map(line => line.split(",")).
    map(array => Event(
        array(0).split(":")(1),
        array(1).split(":")(1),
        array(2).split(":")(1),
        array(3).split(":")(1),
        array(4).split(":")(1),
        array(5).split(":")(1),
        array(6).split(":")(1),
        array(7).split(":")(1),
        array(8).split(":")(1),
        array(9).split(":")(1),
        array(10).split(":")(1),
        array(11).split(":")(1)
        ))
Run Code Online (Sandbox Code Playgroud)

但是我不断收到java.lang.ArrayIndexOutOfBoundsException: 1错误。

做这个的最好方式是什么 ?如您所见,第 5 条记录在某些属性的顺序上略有不同。是否可以根据属性名称进行解析而不是根据“,”等进行拆分?

我正在使用 Spark 1.6.x

scala scala-collections apache-spark apache-spark-sql

2
推荐指数
1
解决办法
3129
查看次数

如何从python 3中的字典值列表中删除nan值?

嗨,我有一个字典列表,每个字典都有一个值列表。在该列表中有nan我希望删除的值。这是一个示例字典;

temp = {'A': ['field1', 'field2', 'field3', np.nan, np.nan], 'B': ['field1', 'field2', 'field3', 'field4', np.nan]}
Run Code Online (Sandbox Code Playgroud)

看起来像;

{'A': ['field1', 'field2', 'field3', nan, nan], 'B': ['field1', 'field2', 'field3', 'field4', nan]}
Run Code Online (Sandbox Code Playgroud)

我想要的输出是:

{'A': ['field1', 'field2', 'field3'], 'B': ['field1', 'field2', 'field3', 'field4']}
Run Code Online (Sandbox Code Playgroud)

我已经厌倦了以下没有成功;

res = {k:v for k,v in temp2.items() if v is not np.nan}
Run Code Online (Sandbox Code Playgroud)

任何帮助表示赞赏

python dictionary python-3.x pandas

1
推荐指数
1
解决办法
2057
查看次数