标签: sparkr

如何将csv读入sparkR ver 1.4?

随着spark(1.4)的新版本发布,似乎有一个很好的前端interfeace sparkR名为包sparkR.在R for spark文档页面上,有一个命令可以将json文件作为RDD对象读取

people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
Run Code Online (Sandbox Code Playgroud)

我试图从这个革命政治博客中.csv描述的文件中读取数据

# Download the nyc flights dataset as a CSV from https://s3-us-west-2.amazonaws.com/sparkr-data/nycflights13.csv

# Launch SparkR using 
# ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3

# The SparkSQL context should already be created for you as sqlContext
sqlContext
# Java ref type org.apache.spark.sql.SQLContext id 1

# Load the flights CSV file using `read.df`. Note that we use the CSV reader …
Run Code Online (Sandbox Code Playgroud)

csv r apache-spark apache-spark-sql sparkr

7
推荐指数
2
解决办法
6309
查看次数

RStudio中sparkR.init(master ="local")中的SparkR错误

我已经将Spark发行版中的SparkR软件包安装到R库中.我可以调用以下命令,它似乎正常工作:library(SparkR)

但是,当我尝试使用以下代码获取Spark上下文时,

sc <- sparkR.init(master="local")
Run Code Online (Sandbox Code Playgroud)

一段时间后,它会因以下消息而失败:

Error in sparkR.init(master = "local") :
   JVM is not ready after 10 seconds
Run Code Online (Sandbox Code Playgroud)

我已经设置了JAVA_HOME,我有一个工作的RStudio,我可以访问其他软件包,如ggplot2.我不知道为什么它不起作用,我甚至不知道在哪里调查这个问题.

rstudio apache-spark sparkr

7
推荐指数
1
解决办法
2156
查看次数

在SparkR中使用apply函数

我目前正在尝试使用sparkR版本1.5.1实现一些功能.我见过较旧的(版本1.3)示例,其中人们在DataFrame上使用了apply函数,但看起来它不再直接可用.例:

x = c(1,2)
xDF_R = data.frame(x)
colnames(xDF_R) = c("number")
xDF_S = createDataFrame(sqlContext,xDF_R)
Run Code Online (Sandbox Code Playgroud)

现在,我可以在data.frame对象上使用sapply函数

xDF_R$result = sapply(xDF_R$number, ppois, q=10)
Run Code Online (Sandbox Code Playgroud)

当我在DataFrame上使用类似的逻辑时

xDF_S$result = sapply(xDF_S$number, ppois, q=10)
Run Code Online (Sandbox Code Playgroud)

我收到错误消息"as.list.default(X)中的错误:没有将此S4类强制转换为向量的方法"

我能以某种方式这样做吗?

sparkr

7
推荐指数
1
解决办法
770
查看次数

为什么在SparkR中收集这么慢?

我有一个500K行火花DataFrame,它位于镶木地板文件中.我正在使用spark 2.0.0和SparkRSpark(RStudio和R 3.3.1)中的软件包,它们都运行在具有4个内核和8GB RAM的本地机器上.

为了便于构建我可以在R中工作的数据集,我使用该collect()方法将spark DataFrame引入R.这样做大约需要3分钟,这比使用它读取等效大小的CSV文件要长得多.data.table包.

不可否认,镶木地板文件是压缩的,减压所需的时间可能是问题的一部分,但我在互联网上发现其他关于收集方法的评论特别慢,而且解释方式也很少.

我在sparklyr中尝试了相同的操作,而且速度要快得多.不幸的是,sparklyr没有像SparkR一样容易在连接和过滤器内部进行日期路径,因此我使用SparkR.另外,我不相信我可以同时使用这两个包(即使用SparkR调用运行查询,然后使用sparklyr访问这些spark对象).

有没有人有类似的经验,解释SparkR的collect()方法相对缓慢,和/或任何解决方案?

r apache-spark sparkr

7
推荐指数
1
解决办法
1773
查看次数

如何在C#中调用Sagemaker培训模型端点API

我通过sagemaker实现了机器学习算法.

我已经为.net安装了SDK,并尝试执行下面的代码.

Uri sagemakerEndPointURI = new Uri("https://runtime.sagemaker.us-east-2.amazonaws.com/endpoints/MyEndpointName/invocations");
Amazon.SageMakerRuntime.Model.InvokeEndpointRequest request = new Amazon.SageMakerRuntime.Model.InvokeEndpointRequest();
request.EndpointName = "MyEndpointName";
AmazonSageMakerRuntimeClient aawsClient = new AmazonSageMakerRuntimeClient(myAwsAccessKey,myAwsSecreteKey);            
Amazon.SageMakerRuntime.Model.InvokeEndpointResponse resposnse= aawsClient.InvokeEndpoint(request);
Run Code Online (Sandbox Code Playgroud)

通过执行此操作,我得到验证错误为" 1 validation error detected: Value at 'body' failed to satisfy constraint: Member must not be null"

任何人都可以指导我如何以及需要传递多少输入数据来调用给定的API?

编辑

此外,我尝试通过provinding body参数,其中包含由'.gz'或'.pkl'文件编写的MemoryStream,并且它给出了错误:"错误解组来自AWS的响应,HTTP内容长度超过5246976字节."

编辑1/23/2018

此外,我想出了错误消息

错误 - 模型服务器 - 'TypeError'对象没有属性'message'

谢谢

c# amazon-s3 amazon-web-services sparkr amazon-sagemaker

7
推荐指数
1
解决办法
1190
查看次数

Spark Dataframe中的重复列

我在hadoop集群中有一个带有重复列的10GB csv文件.我尝试在SparkR中分析它,所以我使用spark-csv包来解析它DataFrame:

  df <- read.df(
    sqlContext,
    FILE_PATH,
    source = "com.databricks.spark.csv",
    header = "true",
    mode = "DROPMALFORMED"
  )
Run Code Online (Sandbox Code Playgroud)

但由于df有重复的Email列,如果我想选择此列,则会出错:

select(df, 'Email')

15/11/19 15:41:58 ERROR RBackendHandler: select on 1422 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  org.apache.spark.sql.AnalysisException: Reference 'Email' is ambiguous, could be: Email#350, Email#361.;
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:278)
...
Run Code Online (Sandbox Code Playgroud)

我想保留第一次出现的Email列并删除后者,我该怎么做?

csv hadoop r apache-spark sparkr

6
推荐指数
2
解决办法
8379
查看次数

无法从SparkR创建的DataFrame中检索数据

我有以下简单SparkR程序,即从中创建SparkR DataFrame和检索/收集数据.

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)
Run Code Online (Sandbox Code Playgroud)

我能够成功创建它并查看信息,但任何与获取数据相关的操作都会抛出错误.

16/07/25 16:33:59 WARN TaskSetManager:阶段17.0中丢失的任务0.3(TID 86,wlos06.nrm.minn.seagate.com):java.net.SocketTimeoutException:接受在java.net.PlainSocketImpl超时. java.net.ServerSocket.implAccept(ServerSocket.java:530)java.net.ServerSocket.accept(ServerSocket.java:498)java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)上的socketAccept(Native Method) org.apache.apark.api.r.RRDD $ .createRWorker(RRDD.scala:432)位于org.apache.spark的org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63). rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)atg.apache.spark.rdd.RDD.iterator(RDD.scala:270)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)位于org.apache.spark.rdd.MapPartitionsRDD.compute的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)atg.apache.spark.rdd.RDD.iterator(RDD.scala:270) (MapPartitionsRDD.scala:38)位于org.apache.spark的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306).rdd.RDD.iterator(RDD.scala:270)atg.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) org.apache.spark.rdd.RDD.iterator(RDD.scala:270)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at org.apache.spark.scheduler.Task.run (Task.scala:89)位于java.util的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)的org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214). concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:745)

16/07/25 16:33:59 ERROR TaskSetManager:阶段17.0中的任务0失败了4次; aborting job 16/07/25 16:33:59错误RBackendHandler:org.apache.spark.sql.api.r.SQLUtils上的dfToCols失败invokeJava中的错误(isStatic = TRUE,className,methodName,...):org. apache.spark.SparkException:作业因阶段失败而中止:阶段17.0中的任务0失败4次,最近失败:阶段17.0中丢失任务0.3(TID 86,wlos06.nrm.minn.seagate.com):java.net .SocketTimeoutException:在Java.net.Server.Socket.implAccept(ServerSocket.java:530)的java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)的java.net.PlainSocketImpl.socketAccept(Native …

hadoop hive r apache-spark sparkr

6
推荐指数
1
解决办法
1207
查看次数

在Spark中汇总多个列

如何在Spark中汇总多个列?例如,在SparkR中,以下代码用于获取一列的总和,但如果我尝试获取两列的总和,df则会出现错误.

# Create SparkDataFrame
df <- createDataFrame(faithful)

# Use agg to sum total waiting times
head(agg(df, totalWaiting = sum(df$waiting)))
##This works

# Use agg to sum total of waiting and eruptions
head(agg(df, total = sum(df$waiting, df$eruptions)))
##This doesn't work
Run Code Online (Sandbox Code Playgroud)

SparkR或PySpark代码都可以使用.

apache-spark pyspark sparkr

6
推荐指数
3
解决办法
2万
查看次数

SparkR DataFrame 分区问题

在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。

我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。

可悲的是,正如本主题中所描述的那样: Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。

就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。

我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.

我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), …
Run Code Online (Sandbox Code Playgroud)

r apache-spark sparkr

6
推荐指数
1
解决办法
773
查看次数

使用SparkR查找生成主键的变量

这是我的玩具数据:

df <- tibble::tribble(
  ~var1, ~var2, ~var3, ~var4, ~var5, ~var6, ~var7,
    "A",   "C",    1L,    5L,  "AA",  "AB",    1L,
    "A",   "C",    2L,    5L,  "BB",  "AC",    2L,
    "A",   "D",    1L,    7L,  "AA",  "BC",    2L,
    "A",   "D",    2L,    3L,  "BB",  "CC",    1L,
    "B",   "C",    1L,    8L,  "AA",  "AB",    1L,
    "B",   "C",    2L,    6L,  "BB",  "AC",    2L,
    "B",   "D",    1L,    9L,  "AA",  "BC",    2L,
    "B",   "D",    2L,    6L,  "BB",  "CC",    1L)
Run Code Online (Sandbox Code Playgroud)

我在以下链接/sf/answers/3717723971/上的原始问题 是:

如何获得唯一标识数据框中观察结果的最小数量变量的组合,即哪些变量可以组成主键?以下答案/代码绝对正常,非常感谢thelatemail.

nms <- unlist(lapply(seq_len(length(df)), combn, x=names(df), simplify=FALSE), …
Run Code Online (Sandbox Code Playgroud)

r sparkr sparklyr

6
推荐指数
1
解决办法
145
查看次数