标签: apache-spark

在什么情况下我可以使用Dask而不是Apache Spark?

我目前正在使用Pandas和Spark进行数据分析.我发现Dask提供了并行化的NumPy数组和Pandas DataFrame.

Pandas在Python中进行数据分析非常简单直观.但由于系统内存有限,我发现难以在Pandas中处理多个更大的数据帧.

简单回答:

Apache Spark是一个包含分布式计算,SQL查询,机器学习等在JVM上运行的全包框架,通常与Hadoop等其他大数据框架共同部署....通常Dask比Spark更小,重量更轻.

我从http://dask.pydata.org/en/latest/spark.html了解下面的详细信息

  • Dask重量轻
  • Dask通常在单个计算机上使用,但也可以在分布式群集上运行良好.
  • Dask提供并行数组,数据帧,机器学习和自定义算法
  • Dask对Python用户有一个优势,因为它本身就是一个Python库,因此当出现问题时进行序列化和调试会更顺利.
  • Dask放弃了高级别的理解,允许用户表达更复杂的并行算法.
  • Dask重量更轻,更易于集成到现有代码和硬件中.
  • 如果你想要一个可以完成所有事情并且你已经在大数据硬件上的项目,那么Spark是一个安全的选择
  • Spark通常用于中小型集群,但也可在单台机器上运行良好.

我从以下链接了解有关Dask的更多信息 https://www.continuum.io/blog/developer-blog/high-performance-hadoop-anaconda-and-dask-your-cluster

  • 如果您在使用Pandas,NumPy或其他使用Python的计算时遇到内存问题,存储限制或单个计算机上的CPU边界,Dask可以帮助您扩展单个计算机上的所有核心,或者向外扩展在群集中的所有核心和内存上.
  • Dask在一台机器上运行良好,可以利用笔记本电脑上的所有内核并处理大于内存的数据
  • 在具有数百个节点的群集上弹性地弹性扩展.
  • Dask使用Python本地工作,具有不同格式和存储系统的数据,包括Hadoop分布式文件系统(HDFS)和Amazon S3.Anaconda和Dask可以与您现有的企业Hadoop发行版配合使用,包括Cloudera CDH和Hortonworks HDP.

http://dask.pydata.org/en/latest/dataframe-overview.html

限制

Dask.DataFrame不实现整个Pandas接口.期望这样的用户会感到失望.但是,dask.dataframe有以下限制:

  1. 从未排序的列设置新索引非常昂贵
  2. 许多操作,例如groupby-apply和join on unsorted columns,需要设置索引,如上所述,索引很昂贵
  3. Pandas API非常庞大.Dask.dataframe不会尝试实现许多pandas功能或任何更奇特的数据结构,如NDFrame

感谢Dask开发人员.这似乎是非常有前途的技术.

总的来说,我可以理解Dask比spark更容易使用.Dask与Pandas一样灵活,具有更大的计算能力和更多的CPU.

我理解关于Dask的所有上述事实.

那么,使用Dask大致可以处理多少数据量(以TB为单位)?

python bigdata pandas apache-spark dask

63
推荐指数
1
解决办法
2万
查看次数

如何删除pyspark数据帧中的列

>>> a
DataFrame[id: bigint, julian_date: string, user_id: bigint]
>>> b
DataFrame[id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint]
>>> a.join(b, a.id==b.id, 'outer')
DataFrame[id: bigint, julian_date: string, user_id: bigint, id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint]
Run Code Online (Sandbox Code Playgroud)

有两个id: bigint,我想删除一个.我能怎么做?

apache-spark apache-spark-sql pyspark

61
推荐指数
5
解决办法
13万
查看次数

DAG如何在RDD的幕后工作?

星火研究论文已规定了新的分布式编程模型,相比于传统的Hadoop MapReduce的,声称在许多情况下,特别是机器学习的简化和广阔的性能提升.但是,材料揭开internal mechanicsResilient Distributed DatasetsDirected Acyclic Graph似乎缺乏本文.

通过调查源代码可以更好地学习吗?

directed-acyclic-graphs apache-spark rdd

60
推荐指数
2
解决办法
4万
查看次数

如何将PySpark中的表数据框导出到csv?

我正在使用Spark 1.3.1(PySpark),我使用SQL查询生成了一个表.我现在有一个对象DataFrame.我想将此DataFrame对象(我将其称为"表")导出到csv文件,以便我可以操作它并绘制列.如何将DataFrame"表" 导出到csv文件?

谢谢!

python dataframe export-to-csv apache-spark apache-spark-sql

59
推荐指数
5
解决办法
15万
查看次数

有没有办法获取Spark Dataframe的前1000行?

我正在使用该randomSplit函数来获取少量的数据帧以用于开发目的,我最终只取这个函数返回的第一个df.

val df_subset = data.randomSplit(Array(0.00000001, 0.01), seed = 12345)(0)
Run Code Online (Sandbox Code Playgroud)

如果我使用df.take(1000)那么我最终得到一个行数组 - 而不是数据帧,所以这对我不起作用.

有没有更好,更简单的方法来说出df的前1000行并将其存储为另一个df?

scala apache-spark

59
推荐指数
1
解决办法
8万
查看次数

如何在Spark 2.0+中编写单元测试?

我一直试图找到一种合理的方法来测试SparkSessionJUnit测试框架.虽然似乎有很好的例子SparkContext,但我无法弄清楚如何使用相应的示例SparkSession,即使它在spark-testing-base内部的几个地方使用过.我很乐意尝试一种不使用spark-testing-base的解决方案,如果它不是真正正确的方式去这里.

简单的测试用例(完整MWE项目build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}
Run Code Online (Sandbox Code Playgroud)

使用JUnit运行它的结果是加载线上的NPE:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at …
Run Code Online (Sandbox Code Playgroud)

junit unit-testing scala apache-spark apache-spark-sql

59
推荐指数
4
解决办法
4万
查看次数

如何将Column.isin与列表一起使用?

val items = List("a", "b", "c")

sqlContext.sql("select c1 from table")
          .filter($"c1".isin(items))
          .collect
          .foreach(println)
Run Code Online (Sandbox Code Playgroud)

上面的代码抛出以下异常.

Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon List(a, b, c) 
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:49)
at org.apache.spark.sql.functions$.lit(functions.scala:89)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.Column.isin(Column.scala:642)
Run Code Online (Sandbox Code Playgroud)

以下是我尝试修复它.它编译并运行但不返回任何匹配.不知道为什么.

val items = List("a", "b", "c").mkString("\"","\",\"","\"")

sqlContext.sql("select c1 from table")
          .filter($"c1".isin(items))
          .collect
          .foreach(println)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

58
推荐指数
3
解决办法
4万
查看次数

Spark Dataframe区分具有重复名称的列

正如我在Spark Dataframe中所知,多列的名称可以与下面的数据帧快照中显示的名称相同:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

58
推荐指数
8
解决办法
8万
查看次数

如何使用JDBC源在(Py)Spark中写入和读取数据?

这个问题的目标是记录:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和已知解决方案可能存在的问题

通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.

python scala apache-spark apache-spark-sql pyspark

57
推荐指数
1
解决办法
6万
查看次数

Apache Spark vs Akka

你能否告诉我Apache Spark和AKKA之间的区别,我知道这两个框架都意味着编程分布式和并行计算,但我没有看到它们之间的链接或区别.

此外,我想得到适合他们每个人的用例.

parallel-processing distributed-computing bigdata akka apache-spark

56
推荐指数
3
解决办法
4万
查看次数