标签: spark-dataframe

如何使用DataFrame和JDBC连接提高慢速Spark作业的性能?

我试图在单个节点(local [*])上以独立模式通过JDBC访问中型Teradata表(约1亿行).

我正在使用Spark 1.4.1.并且设置在一个非常强大的机器上(2个CPU,24个内核,126G RAM).

我已经尝试了几种内存设置和调整选项,以使其更快地工作,但它们都没有产生巨大的影响.

我确信有一些我缺少的东西,下面是我的最后一次尝试,花了大约11分钟来获得这个简单的计数与使用JDBC连接通过R只需要40秒来获得计数.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()
Run Code Online (Sandbox Code Playgroud)

当我尝试使用BIG表(5B记录)时,在完成查询后没有返回任何结果.

teradata apache-spark pyspark spark-dataframe

8
推荐指数
2
解决办法
1万
查看次数

Spark saveAsTextFile()导致Mkdirs无法为目录的一半创建

我目前在tomcat中运行Java Spark应用程序并收到以下异常:

Caused by: java.io.IOException: Mkdirs failed to create file:/opt/folder/tmp/file.json/_temporary/0/_temporary/attempt_201603031703_0001_m_000000_5
Run Code Online (Sandbox Code Playgroud)

在线上

text.saveAsTextFile("/opt/folder/tmp/file.json") //where text is a JavaRDD<String>

问题是/ opt/folder/tmp /已经存在并且成功创建了最多/opt/folder/tmp/file.json/_temporary/0/然后它会遇到与剩余部分相似的权限问题.路径_temporary/attempt_201603031703_0001_m_000000_5本身,但我给了tomcat用户权限(chown -R tomcat:tomcat tmp/chmod -R 755 tmp/)到tmp /目录.有谁知道会发生什么?

谢谢

编辑@javadba:

[root@ip tmp]# ls -lrta 
total 12
drwxr-xr-x 4 tomcat tomcat 4096 Mar  3 16:44 ..
drwxr-xr-x 3 tomcat tomcat 4096 Mar  7 20:01 file.json
drwxrwxrwx 3 tomcat tomcat 4096 Mar  7 20:01 .

[root@ip tmp]# cd file.json/
[root@ip file.json]# ls -lrta 
total 12
drwxr-xr-x 3 …
Run Code Online (Sandbox Code Playgroud)

java tomcat apache-spark spark-dataframe

8
推荐指数
1
解决办法
7801
查看次数

Spark Sql:TypeError("StructType不能接受类型%s中的对象"%type(obj))

我目前正在使用PyODBC从SQL Server中提取数据并尝试以近实时(NRT)方式插入Hive中的表.

我从源代码获得了一行并转换为List [Strings]并以编程方式创建了schema,但在创建DataFrame时,Spark抛出了StructType错误.

>>> cnxn = pyodbc.connect(con_string)
>>> aj = cnxn.cursor()
>>>
>>> aj.execute("select * from tjob")
<pyodbc.Cursor object at 0x257b2d0>

>>> row = aj.fetchone()

>>> row
(1127, u'', u'8196660', u'', u'', 0, u'', u'', None, 35, None, 0, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, u'', 0, None, None)
>>> rowstr = map(str,row)
>>> rowstr
['1127', '', '8196660', '', '', '0', '', '', 'None', '35', …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
1万
查看次数

SparkSQL:我可以在同一个查询中分解两个不同的变量吗?

我有以下爆炸查询,工作正常:

data1 = sqlContext.sql("select explode(names) as name from data")
Run Code Online (Sandbox Code Playgroud)

我想爆炸另一个领域"颜色",所以最终输出可能是名称和颜色的笛卡尔积.所以我做了:

data1 = sqlContext.sql("select explode(names) as name, explode(colors) as color from data")
Run Code Online (Sandbox Code Playgroud)

但是我得到了错误:

 Only one generator allowed per select but Generate and and Explode found.;
Run Code Online (Sandbox Code Playgroud)

有谁有想法吗?


我实际上可以通过两个步骤使其工作:

   data1 = sqlContext.sql("select explode(names) as name from data")
   data1.registerTempTable('data1')
   data1 = sqlContext.sql("select explode(colors) as color from data1")
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有可能一步到位?非常感谢!

apache-spark apache-spark-sql spark-dataframe

8
推荐指数
2
解决办法
1万
查看次数

Parquet vs Cassandra使用Spark和DataFrames

我已陷入这种困境,我无法选择哪种解决方案对我更好.我有一个非常大的表(几个100GB)和几个较小的(几个GB).为了在Spark中创建我的数据管道并使用spark ML,我需要加入这些表并执行几个GroupBy(聚合)操作.那些操作对我来说真的很慢,所以我选择了这两个中的一个:

  • 使用Cassandra并使用索引来加速GoupBy操作.
  • 根据数据布局使用Parquet和Partitioning.

我可以说Parquet分区工作速度更快,可扩展性更高,而且Cassandra使用的内存开销更少.所以问题是:

如果开发人员推断并了解数据布局及其使用方式,那么使用Parquet会不会更好,因为您可以更好地控制它?我为什么要为Cassandra带来的开销付出代价?

cassandra apache-spark parquet spark-dataframe

8
推荐指数
2
解决办法
6099
查看次数

如何使用pyspark计算apache spark数据框的大小?

有没有办法使用pyspark计算Apache spark数据框的大小(以字节为单位)?

apache-spark pyspark spark-dataframe

8
推荐指数
1
解决办法
2685
查看次数

从hive表中读取并使用spark sql写回来

我正在使用Spark SQL读取Hive表并将其分配给scala val

val x = sqlContext.sql("select * from some_table")
Run Code Online (Sandbox Code Playgroud)

然后我正在使用数据帧x进行一些处理,最后得到一个数据帧y,它具有与表some_table一样的精确模式.

最后,我试图将y数据帧覆盖到同一个hive表some_table

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")
Run Code Online (Sandbox Code Playgroud)

然后我收到错误

org.apache.spark.sql.AnalysisException:无法将覆盖插入到也从中读取的表中

我尝试创建一个insert sql语句并使用sqlContext.sql()触发它,但它也给了我同样的错误.

有什么办法可以绕过这个错误吗?我需要将记录插回到同一个表中.


嗨,我尝试按照建议做,但仍然得到相同的错误.

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;
Run Code Online (Sandbox Code Playgroud)

hadoop scala apache-spark apache-spark-sql spark-dataframe

8
推荐指数
2
解决办法
1万
查看次数

PySpark:TypeError:'Column'对象不可调用

我正在从HDFS加载数据,我希望通过特定变量进行过滤.但不知何故,Column.isin命令不起作用.它抛出此错误:

TypeError:'Column'对象不可调用

from pyspark.sql.functions import udf, col
variables = ('852-PI-769', '812-HC-037', '852-PC-571-OUT')
df = sqlContext.read.option("mergeSchema", "true").parquet("parameters.parquet")
same_var = col("Variable").isin(variables)
df2 = df.filter(same_var)
Run Code Online (Sandbox Code Playgroud)

架构如下所示:

df.printSchema()
root
 |-- Time: timestamp (nullable = true)
 |-- Value: float (nullable = true)
 |-- Variable: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

知道我做错了什么吗?PS:这是使用Jupyter笔记本的Spark 1.4.

python apache-spark pyspark spark-dataframe

8
推荐指数
1
解决办法
2万
查看次数

Spark"用0替换null"性能比较

Spark 1.6.1,Scala api.

对于数据帧,我需要将某个列的所有空值替换为0.我有两种方法可以做到这一点.1.

myDF.withColumn("pipConfidence", when($"mycol".isNull, 0).otherwise($"mycol"))
Run Code Online (Sandbox Code Playgroud)

2.

myDF.na.fill(0, Seq("mycol"))
Run Code Online (Sandbox Code Playgroud)

它们基本相同还是一种方式首选?

谢谢!

apache-spark spark-dataframe

8
推荐指数
1
解决办法
1万
查看次数

java.lang.NoClassDefFoundError:在scala代码中通过spark-submit启动spark作业时无法初始化类

我有一个代码,如下所示

 object ErrorTest {
case class APIResults(status:String, col_1:Long, col_2:Double, ...)

def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = {
  //call some API ang get results and return APIResults
  ...
}

// MARK: load properties
val props = loadProperties()
private def loadProperties(): Properties =  {
  val configFile = new File("config.properties")
  val reader = new FileReader(configFile)
  val props = new Properties()
  props.load(reader)
  props
}

def main(args: Array[String]): Unit = {
  val prop_a = props.getProperty("prop_a")

  val session = Context.initialSparkSession();
  import session.implicits._

  val initialSet = ArrayBuffer.empty[Row]
  val addToSet …

java scala apache-spark apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
5201
查看次数