标签: apache-spark

按行索引拆分Spark数据帧

我想按行顺序拆分数据框.如果有100行,则所需的分割成4个相等的数据帧应该有指数0-24,25-49,50-74,和75-99,分别.

唯一可用的预定义功能是randomSplit.但是randomSplit在拆分之前随机化数据.我想到的另一种方法是使用count缩减操作找到数据计数,然后继续使用数据提取 take但是它非常昂贵.在保持相同顺序的同时,还有其他方法可以实现上述目标吗?

python apache-spark pyspark

1
推荐指数
1
解决办法
1497
查看次数

如何从Spark将文件写入Cassandra

我是Spark和Cassandra的新手。我使用此代码,但它给我错误。

val dfprev = df.select(col = "se","hu")
val a = dfprev.select("se")
val b = dfprev.select("hu")
val collection = sc.parallelize(Seq(a,b))
collection.saveToCassandra("keyspace", "table", SomeColumns("se","hu"))
Run Code Online (Sandbox Code Playgroud)

当我在上输入此代码时savetocassandra,它给我错误,错误是:

java.lang.IllegalArgumentException:不允许多个带有相同数量参数的构造方法。com.datastax.spark.connector.util.Reflect $ .methodSymbol(Reflect.scala:16)com.datastax.spark.connector.util.ReflectionUtil $ .constructorParams(ReflectionUtil.scala:63)com.datastax.spark .connector.mapper.DefaultColumnMapper。(DefaultColumnMapper.scala:45)位于com.datastax.spark.connector.mapper.LowPriorityColumnMapper $ class.defaultColumnMapper(ColumnMapper.scala:51),位于om.datastax.spark.connector.mapper.ColumnMapper $ .defaultColumnMapper(ColumnMapper.scala:55)

hadoop scala cassandra apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2248
查看次数

如何正确安装Zeppeling并加载PySpark?

首先,下载并解压缩.tgz

tar xvf zeppelin-0.7.3-bin-all.tgz
Run Code Online (Sandbox Code Playgroud)

二,修改变量回家,

vi ~/.bashrc
Run Code Online (Sandbox Code Playgroud)

添加

export SPARK_HOME="/home/miguel/spark-2.3.0-bin-hadoop2.7/"

第三,在cmd上午餐Zeppeling

bin/zeppelin-daemon.sh start
Run Code Online (Sandbox Code Playgroud)

第四,尝试执行pyspark

%pyspark
print("Hello")
Run Code Online (Sandbox Code Playgroud)

我收到了这个错误:

java.lang.ClassNotFoundException: org.apache.spark.ui.jobs.JobProgressListener
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.zeppelin.spark.SparkInterpreter.setupListeners(SparkInterpreter.java:170)
    at org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:148)
    at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:843)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
    at org.apache.zeppelin.spark.PySparkInterpreter.getSparkInterpreter(PySparkInterpreter.java:565)
    at org.apache.zeppelin.spark.PySparkInterpreter.createGatewayServerAndStartScript(PySparkInterpreter.java:209)
    at org.apache.zeppelin.spark.PySparkInterpreter.open(PySparkInterpreter.java:162)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:491)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-zeppelin

1
推荐指数
1
解决办法
1177
查看次数

火花提交选项列表

Spark 配置页面上提到了大量的可调设置。然而,随着告诉这里SparkSubmitOptionParser 属性名称Spark属性可以从不同property's名

举例来说,spark.executor.cores如通过--executor-coresspark-submit


在哪里可以找到命令可以传递的所有调整参数(及其属性名称)的详尽列表?SparkSparkSubmitOptionParserspark-submit

apache-spark spark-submit

1
推荐指数
1
解决办法
1万
查看次数

使用PySpark将JSON文件读作Pyspark Dataframe?

如何使用PySpark阅读以下JSON结构来激发数据帧?

我的JSON结构

{"results":[{"a":1,"b":2,"c":"name"},{"a":2,"b":5,"c":"foo"}]}
Run Code Online (Sandbox Code Playgroud)

我尝试过:

df = spark.read.json('simple.json');
Run Code Online (Sandbox Code Playgroud)

我希望输出a,b,c作为列和值作为相应的行.

谢谢.

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
7484
查看次数

Spark数据帧将整数转换为时间戳并查找日期差

我有这个DataFrame org.apache.spark.sql.DataFrame

|-- timestamp: integer (nullable = true)
|-- checkIn: string (nullable = true)

| timestamp|   checkIn|
+----------+----------+
|1521710892|2018-05-19|
|1521710892|2018-05-19|
Run Code Online (Sandbox Code Playgroud)

所需的结果:获取一个新的列,该列的日期checkIntimestamp(和2018-03-03 23:59:592018-03-04 00:00:01之间的天差应为1)

因此,我需要

  • 将时间戳转换为日期(这是我遇到的问题)
  • 从另一个取出一个日期
  • 使用某些功能提取日期(尚未找到此功能)

scala apache-spark

1
推荐指数
1
解决办法
3990
查看次数

为什么Spark失败并且"值rdf不是org.apache.spark.sql.SparkSession的成员"?

我正在尝试使用SANSA-RDF将海龟RDF文件读入Spark并创建图形.我执行以下代码时收到错误.我错过了什么?

    import org.apache.jena.query.QueryFactory
    import org.apache.jena.riot.Lang
    import org.apache.spark.sql.SparkSession
    import net.sansa_stack.rdf.spark.io.rdf._
    import net.sansa_stack.rdf.spark.io._
    import scala.io.Source

    object SparkExecutor {
      private var ss:SparkSession = null

      def ConfigureSpark(): Unit ={

        ss = SparkSession.builder
          .master("local[*]")
          .config("spark.driver.cores", 1)
          .appName("LAM")
          .getOrCreate()

      }

      def createGraph(): Unit ={
        val filename = "xyz.ttl"
        print("Loading graph from file"+ filename)
        val lang = Lang.TTL
        val triples = ss.rdf(lang)(filename)
        val graph = LoadGraph(triples)    
      }
    }
Run Code Online (Sandbox Code Playgroud)

我正在使用main函数调用SparkExecutor

    object main {
      def main(args: Array[String]): Unit = {
        SparkExecutor.ConfigureSpark()
        val RDFGraph = SparkExecutor.createGraph()
      }
    } …
Run Code Online (Sandbox Code Playgroud)

rdf scala apache-spark turtle-rdf

1
推荐指数
1
解决办法
288
查看次数

更改asspark数据框中的列值的日期格式

我正在将Excel工作表读入DataframeSpark 2.0中,然后尝试将具有日期值的某些列MM/DD/YY转换为YYYY-MM-DDformat格式。值是字符串格式。下面是示例:

+---------------+--------------+
|modified       |      created |
+---------------+--------------+
|           null| 12/4/17 13:45|
|        2/20/18|  2/2/18 20:50|
|        3/20/18|  2/2/18 21:10|
|        2/20/18|  2/2/18 21:23|
|        2/28/18|12/12/17 15:42| 
|        1/25/18| 11/9/17 13:10|
|        1/29/18| 12/6/17 10:07| 
+---------------+--------------+
Run Code Online (Sandbox Code Playgroud)

我希望将其转换为:

+---------------+-----------------+
|modified       |      created    |
+---------------+-----------------+
|           null| 2017-12-04 13:45|
|     2018-02-20| 2018-02-02 20:50|
|     2018-03-20| 2018-02-02 21:10|
|     2018-02-20| 2018-02-02 21:23|
|     2018-02-28| 2017-12-12 15:42| 
|     2018-01-25| 2017-11-09 13:10|
|     2018-01-29| 2017-12-06 10:07| 
+---------------+-----------------+ …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

1
推荐指数
1
解决办法
7055
查看次数

启用了Spark Streaming Checkpoint的java.io.NotSerializableException

我在火花流应用程序中启用了检查点,并且在作为依赖项下载的类上遇到此错误。

没有检查点,该应用程序运行良好。

错误:

com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer
Serialization stack:
    - object not serializable (class: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer, value: com.fasterxml.jackson.module.paranamer.shaded.CachingParanamer@46c7c593)
    - field (class: com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, name: _paranamer, type: interface com.fasterxml.jackson.module.paranamer.shaded.Paranamer)
    - object (class com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector, com.fasterxml.jackson.module.paranamer.ParanamerAnnotationIntrospector@39d62e47)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _secondary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@7a925ac4)
    - field (class: com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, name: _primary, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair, com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair@203b98cf)
    - field (class: com.fasterxml.jackson.databind.cfg.BaseSettings, name: _annotationIntrospector, type: class com.fasterxml.jackson.databind.AnnotationIntrospector)
    - object (class com.fasterxml.jackson.databind.cfg.BaseSettings, com.fasterxml.jackson.databind.cfg.BaseSettings@78c34153)
    - field (class: com.fasterxml.jackson.databind.cfg.MapperConfig, name: _base, type: class com.fasterxml.jackson.databind.cfg.BaseSettings)
    - …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming

1
推荐指数
1
解决办法
307
查看次数

将向量列添加到pyspark DataFrame

如何将Vectors.dense列添加到pyspark数据框?

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.linalg import DenseVector

py_df = pd.DataFrame.from_dict({"time": [59., 115., 156., 421.], "event": [1, 1, 1, 0]})

sc = SparkContext(master="local")
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(py_df)
sdf.withColumn("features", DenseVector(1))
Run Code Online (Sandbox Code Playgroud)

在文件anaconda3/lib/python3.6/site-packages/pyspark/sql/dataframe.py第1848行中给出错误:

AssertionError: col should be Column
Run Code Online (Sandbox Code Playgroud)

它不喜欢将DenseVector类型作为列。本质上,我有一个pandas数据框,我想将其转换为pyspark数据框并添加type的列Vectors.dense。还有另一种方法吗?

dataframe apache-spark pyspark apache-spark-ml

1
推荐指数
1
解决办法
1122
查看次数