小编Tza*_*har的帖子

如何在使用scala读取文件后删除hdfs目录中的文件

我使用fileStream从Spark(流上下文)读取hdfs目录中的文件.如果我的Spark关闭并在一段时间后启动,我想读取目录中的新文件.我不想读取已经由Spark读取和处理的目录中的旧文件.我想在这里避免重复.

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File")
Run Code Online (Sandbox Code Playgroud)

任何代码片段都有帮助吗?

hadoop scala apache-spark spark-streaming

10
推荐指数
1
解决办法
2万
查看次数

如何从spark shell设置spark.local.dir属性?

我正在尝试spark.local.dir使用spark-shell进行设置sc.getconf.set("spark.local.dir","/temp/spark"),但它无法正常工作.有没有其他方法可以从sparkshell设置此属性.

scala apache-spark

7
推荐指数
1
解决办法
9708
查看次数

Spark Scala:任务不可序列化错误

我正在使用带有Scala插件和spark库的IntelliJ社区版.我还在学习Spark并且正在使用Scala工作表.

我写了下面的代码,删除字符串中的标点符号:

def removePunctuation(text: String): String = {
  val punctPattern = "[^a-zA-Z0-9\\s]".r
  punctPattern.replaceAllIn(text, "").toLowerCase
}
Run Code Online (Sandbox Code Playgroud)

然后我读了一个文本文件并尝试删除标点符号:

val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)
Run Code Online (Sandbox Code Playgroud)

这给出了如下错误,任何帮助将不胜感激:

org.apache.spark.SparkException:org.apache.spark.util上的org.apache.spark.util.ClosureCleaner $ .ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294)中的任务不可序列化.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(/home/ubuntu/src/main/scala/Test.sc:284)org.apache.spark.util.ClosureCleaner $ .clean(/ home /ubuntu/src/main/scala/Test.sc:104)在org.apache.spark的org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090). rdd.RDD $$ anonfun $ map $ 1.apply(/home/ubuntu/src/main/scala/Test.sc:366)org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(/ home /ubuntu/src/main/scala/Test.sc:365)在org.apache.spark.rdd.RDDOperationScope $ .withScope(/home/ubuntu/src/main/scala/Test.sc:147)的#worksheet# .#worksheet#(/ home/ubuntu/src/main/scala/Test.sc:108)引起:java.io.NotSerializableException:A $ A21 $ A $ A21序列化堆栈: - 对象不可序列化(类:A $ A21 $ A $ A21,价值:A $ A21 $ A $ A21 @ …

scala apache-spark pyspark

7
推荐指数
2
解决办法
2万
查看次数

使用scala查找当前用户名

我希望在scala中找到当前的用户名.

在命令行上我可以这样做:

whoami 
Run Code Online (Sandbox Code Playgroud)

在python中,我可以这样做:

import getpass
user_name = getpass.getuser()
Run Code Online (Sandbox Code Playgroud)

如何在Scala中找到用户名?

scala

6
推荐指数
1
解决办法
2077
查看次数

Spark Dataframe更改列值

我得到了一些170列的数据帧.在一列中,我有一个"名称"字符串,当我将它们写入Postgres时,这个字符串有时会有一些特殊符号,如"'",这些符号是不合适的.我可以做那样的事情:

Df[$'name']=Df[$'name'].map(x => x.replaceAll("'","")) ?
Run Code Online (Sandbox Code Playgroud)

我不想解析完整的DataFrame,因为它非常庞大.请帮助我

scala dataframe apache-spark

6
推荐指数
2
解决办法
2万
查看次数

Scala:使用log4j将日志写入文件

我试图在eclipse中构建一个基于scala的jar文件,该文件使用log4j来创建日志。它可以在控制台中完美打印,但是当我尝试使用log4j.properties文件使其写入日志文件时,什么也没发生。

项目结构如下

具有Scala性质的Maven项目

loggerTest.scala

package scala.n*****.n****

import org.apache.log4j.Logger

object loggerTest extends LogHelper {
  def main(args : Array[String]){
    log.info("This is info");
    log.error("This is error");
    log.warn("This is warn");
  }
}

trait LogHelper {
  lazy val log = Logger.getLogger(this.getClass.getName)
}
Run Code Online (Sandbox Code Playgroud)

log4j.properties

# Root logger option
log4j.rootLogger=WARN, stdout, file

# Redirect log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# Redirect log messages to a log file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/home/abc/test/abc.log
log4j.appender.file.encoding=UTF-8
log4j.appender.file.MaxFileSize=2kB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Run Code Online (Sandbox Code Playgroud)

pom.xml …

log4j scala jar apache-spark

5
推荐指数
1
解决办法
8243
查看次数

在 Spark Log 中获取完整的堆栈跟踪

我想在我的 spark 执行器日志中看到完整的堆栈跟踪。

我有例如:

引起:java.lang.RuntimeException:java.lang.Long 不是 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(generated.java:434) 处 int 架构的有效外部类型在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:737) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:290) .. . 18 更多

我想看看还有 18 个失踪的。我曾尝试更改为日志记录级别等,但没有帮助。

scala apache-spark

5
推荐指数
1
解决办法
3424
查看次数

Spark SQL - 用模式读取csv

我在尝试使用Spark简单读取CSV文件时遇到了问题.经过这样的操作后,我想确保:

  • 数据类型是正确的(使用提供的架构)
  • 标头对提供的架构是正确的

这是我使用的代码,但有以下问题:

val schema = Encoders.product[T].schema
val df = spark.read
 .schema(schema)
 .option("header", "true")
 .csv(fileName)
Run Code Online (Sandbox Code Playgroud)

类型T是类型Product,即案例类.这可行,但它不检查列名是否正确,所以我可以给另一个文件,只要数据类型是正确的,不会发生错误,我不知道用户提供了错误的文件,但有些巧合正确的数据类型和正确的顺序.

我尝试使用推断模式然后.as[T]在数据集上使用方法的选项,但是如果除了String包含null 之外的任何列只有它由Spark解释为String列,但在我的模式中它是Integer.因此会发生强制转换异常,但已经检查了列名称.

总结一下:我找到了解决方案,我可以确保正确的数据类型,但没有标题和其他解决方案,我可以验证标题,但有数据类型的问题.有没有办法实现这两者,即标题和类型的完整验证?

我正在使用Spark 2.2.0.

csv validation schema scala apache-spark

5
推荐指数
1
解决办法
4940
查看次数

Scala-Spark使用参数值动态调用groupby和agg

我想编写自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似下面的函数.但我是Scala的新手,我无法解决它.

def groupAndAggregate(df: DataFrame,  aggregateFun: Map[String, String], cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
}
Run Code Online (Sandbox Code Playgroud)

并希望称之为

val listOfStrings =  List("A", "B", "C")
val result = groupAndAggregate(df, Map("D"-> "SUM", "E"-> "COUNT"), listOfStrings)
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?任何人都可以帮助我.

customization scala group-by aggregate apache-spark

4
推荐指数
1
解决办法
3851
查看次数

增加任务大小 spark

在 spark-shell 中执行代码时遇到问题。

[Stage 1:>             (0 + 0) / 16]
17/01/13 06:09:24 WARN TaskSetManager: Stage 1 contains a task of very large size (1057 KB). The maximum recommended task size is 100 KB.
[Stage 1:>             (0 + 4) / 16]
Run Code Online (Sandbox Code Playgroud)

在此警告之后,执行被阻止。

谁能解决?

我试过这个,但它不能解决问题。

val conf = new SparkConf()
    .setAppName("MyApp")
    .setMaster("local[*]")
    .set("spark.driver.maxResultSize", "3g")
    .set("spark.executor.memory" ,"3g");
val sc = new SparkContext(conf);`
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

4
推荐指数
2
解决办法
8297
查看次数