随着spark(1.4)的新版本发布,似乎有一个很好的前端interfeace spark从R名为包sparkR.在R for spark的文档页面上,有一个命令可以将json文件作为RDD对象读取
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
Run Code Online (Sandbox Code Playgroud)
我试图从这个革命政治博客中.csv描述的文件中读取数据
# Download the nyc flights dataset as a CSV from https://s3-us-west-2.amazonaws.com/sparkr-data/nycflights13.csv
# Launch SparkR using
# ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3
# The SparkSQL context should already be created for you as sqlContext
sqlContext
# Java ref type org.apache.spark.sql.SQLContext id 1
# Load the flights CSV file using `read.df`. Note that we use the CSV reader …Run Code Online (Sandbox Code Playgroud) 我已经将Spark发行版中的SparkR软件包安装到R库中.我可以调用以下命令,它似乎正常工作:library(SparkR)
但是,当我尝试使用以下代码获取Spark上下文时,
sc <- sparkR.init(master="local")
Run Code Online (Sandbox Code Playgroud)
一段时间后,它会因以下消息而失败:
Error in sparkR.init(master = "local") :
JVM is not ready after 10 seconds
Run Code Online (Sandbox Code Playgroud)
我已经设置了JAVA_HOME,我有一个工作的RStudio,我可以访问其他软件包,如ggplot2.我不知道为什么它不起作用,我甚至不知道在哪里调查这个问题.
我目前正在尝试使用sparkR版本1.5.1实现一些功能.我见过较旧的(版本1.3)示例,其中人们在DataFrame上使用了apply函数,但看起来它不再直接可用.例:
x = c(1,2)
xDF_R = data.frame(x)
colnames(xDF_R) = c("number")
xDF_S = createDataFrame(sqlContext,xDF_R)
Run Code Online (Sandbox Code Playgroud)
现在,我可以在data.frame对象上使用sapply函数
xDF_R$result = sapply(xDF_R$number, ppois, q=10)
Run Code Online (Sandbox Code Playgroud)
当我在DataFrame上使用类似的逻辑时
xDF_S$result = sapply(xDF_S$number, ppois, q=10)
Run Code Online (Sandbox Code Playgroud)
我收到错误消息"as.list.default(X)中的错误:没有将此S4类强制转换为向量的方法"
我能以某种方式这样做吗?
我有一个500K行火花DataFrame,它位于镶木地板文件中.我正在使用spark 2.0.0和SparkRSpark(RStudio和R 3.3.1)中的软件包,它们都运行在具有4个内核和8GB RAM的本地机器上.
为了便于构建我可以在R中工作的数据集,我使用该collect()方法将spark DataFrame引入R.这样做大约需要3分钟,这比使用它读取等效大小的CSV文件要长得多.data.table包.
不可否认,镶木地板文件是压缩的,减压所需的时间可能是问题的一部分,但我在互联网上发现其他关于收集方法的评论特别慢,而且解释方式也很少.
我在sparklyr中尝试了相同的操作,而且速度要快得多.不幸的是,sparklyr没有像SparkR一样容易在连接和过滤器内部进行日期路径,因此我使用SparkR.另外,我不相信我可以同时使用这两个包(即使用SparkR调用运行查询,然后使用sparklyr访问这些spark对象).
有没有人有类似的经验,解释SparkR的collect()方法相对缓慢,和/或任何解决方案?
我通过sagemaker实现了机器学习算法.
我已经为.net安装了SDK,并尝试执行下面的代码.
Uri sagemakerEndPointURI = new Uri("https://runtime.sagemaker.us-east-2.amazonaws.com/endpoints/MyEndpointName/invocations");
Amazon.SageMakerRuntime.Model.InvokeEndpointRequest request = new Amazon.SageMakerRuntime.Model.InvokeEndpointRequest();
request.EndpointName = "MyEndpointName";
AmazonSageMakerRuntimeClient aawsClient = new AmazonSageMakerRuntimeClient(myAwsAccessKey,myAwsSecreteKey);
Amazon.SageMakerRuntime.Model.InvokeEndpointResponse resposnse= aawsClient.InvokeEndpoint(request);
Run Code Online (Sandbox Code Playgroud)
通过执行此操作,我得到验证错误为" 1 validation error detected: Value at 'body' failed to satisfy constraint: Member must not be null"
任何人都可以指导我如何以及需要传递多少输入数据来调用给定的API?
编辑
此外,我尝试通过provinding body参数,其中包含由'.gz'或'.pkl'文件编写的MemoryStream,并且它给出了错误:"错误解组来自AWS的响应,HTTP内容长度超过5246976字节."
编辑1/23/2018
此外,我想出了错误消息
错误 - 模型服务器 - 'TypeError'对象没有属性'message'
谢谢
我在hadoop集群中有一个带有重复列的10GB csv文件.我尝试在SparkR中分析它,所以我使用spark-csv包来解析它DataFrame:
df <- read.df(
sqlContext,
FILE_PATH,
source = "com.databricks.spark.csv",
header = "true",
mode = "DROPMALFORMED"
)
Run Code Online (Sandbox Code Playgroud)
但由于df有重复的Email列,如果我想选择此列,则会出错:
select(df, 'Email')
15/11/19 15:41:58 ERROR RBackendHandler: select on 1422 failed
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
org.apache.spark.sql.AnalysisException: Reference 'Email' is ambiguous, could be: Email#350, Email#361.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:278)
...
Run Code Online (Sandbox Code Playgroud)
我想保留第一次出现的Email列并删除后者,我该怎么做?
我有以下简单SparkR程序,即从中创建SparkR DataFrame和检索/收集数据.
Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)
n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)
xs
head(xs)
collect(xs)
Run Code Online (Sandbox Code Playgroud)
我能够成功创建它并查看信息,但任何与获取数据相关的操作都会抛出错误.
16/07/25 16:33:59 WARN TaskSetManager:阶段17.0中丢失的任务0.3(TID 86,wlos06.nrm.minn.seagate.com):java.net.SocketTimeoutException:接受在java.net.PlainSocketImpl超时. java.net.ServerSocket.implAccept(ServerSocket.java:530)java.net.ServerSocket.accept(ServerSocket.java:498)java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)上的socketAccept(Native Method) org.apache.apark.api.r.RRDD $ .createRWorker(RRDD.scala:432)位于org.apache.spark的org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63). rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)atg.apache.spark.rdd.RDD.iterator(RDD.scala:270)org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)位于org.apache.spark.rdd.MapPartitionsRDD.compute的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)atg.apache.spark.rdd.RDD.iterator(RDD.scala:270) (MapPartitionsRDD.scala:38)位于org.apache.spark的org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306).rdd.RDD.iterator(RDD.scala:270)atg.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) org.apache.spark.rdd.RDD.iterator(RDD.scala:270)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)at org.apache.spark.scheduler.Task.run (Task.scala:89)位于java.util的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)的org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214). concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:745)
16/07/25 16:33:59 ERROR TaskSetManager:阶段17.0中的任务0失败了4次; aborting job 16/07/25 16:33:59错误RBackendHandler:org.apache.spark.sql.api.r.SQLUtils上的dfToCols失败invokeJava中的错误(isStatic = TRUE,className,methodName,...):org. apache.spark.SparkException:作业因阶段失败而中止:阶段17.0中的任务0失败4次,最近失败:阶段17.0中丢失任务0.3(TID 86,wlos06.nrm.minn.seagate.com):java.net .SocketTimeoutException:在Java.net.Server.Socket.implAccept(ServerSocket.java:530)的java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)的java.net.PlainSocketImpl.socketAccept(Native …
如何在Spark中汇总多个列?例如,在SparkR中,以下代码用于获取一列的总和,但如果我尝试获取两列的总和,df则会出现错误.
# Create SparkDataFrame
df <- createDataFrame(faithful)
# Use agg to sum total waiting times
head(agg(df, totalWaiting = sum(df$waiting)))
##This works
# Use agg to sum total of waiting and eruptions
head(agg(df, total = sum(df$waiting, df$eruptions)))
##This doesn't work
Run Code Online (Sandbox Code Playgroud)
SparkR或PySpark代码都可以使用.
在我的 R 脚本中,我有一个SparkDataFrame包含四个不同月份数据的两列(时间、值)。由于我需要将我的函数分别应用到每个月,我想我会将repartition它分成四个分区,每个分区将保存一个月的数据。
我创建了一个名为 partition 的附加列,具有一个整数值 0 - 3,然后repartition通过此特定列调用该方法。
可悲的是,正如本主题中所描述的那样:
Spark SQL - df.repartition 和 DataFrameWriter partitionBy 之间的区别?,使用该repartition方法我们只确定所有具有相同键的数据最终会在同一个分区中,但是具有不同键的数据也可以最终在同一个分区中。
就我而言,执行下面可见的代码会创建 4 个分区,但只用数据填充其中的 2 个。
我想我应该使用该partitionBy方法,但是在 SparkR 的情况下,我不知道该怎么做。官方文档指出,此方法适用于称为WindowSpec而不是DataFrame.
我真的很感激这方面的一些帮助,因为我不知道如何将此方法合并到我的代码中。
sparkR.session(
master="local[*]", sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)
schema <- structType(
structField("time", "timestamp"),
structField("value", "double"),
structField("partition", "string"))
processedDf <- dapply(repartitionedDf,
function(x) { data.frame(produceHourlyResults(x), …Run Code Online (Sandbox Code Playgroud) 这是我的玩具数据:
df <- tibble::tribble(
~var1, ~var2, ~var3, ~var4, ~var5, ~var6, ~var7,
"A", "C", 1L, 5L, "AA", "AB", 1L,
"A", "C", 2L, 5L, "BB", "AC", 2L,
"A", "D", 1L, 7L, "AA", "BC", 2L,
"A", "D", 2L, 3L, "BB", "CC", 1L,
"B", "C", 1L, 8L, "AA", "AB", 1L,
"B", "C", 2L, 6L, "BB", "AC", 2L,
"B", "D", 1L, 9L, "AA", "BC", 2L,
"B", "D", 2L, 6L, "BB", "CC", 1L)
Run Code Online (Sandbox Code Playgroud)
我在以下链接/sf/answers/3717723971/上的原始问题 是:
如何获得唯一标识数据框中观察结果的最小数量变量的组合,即哪些变量可以组成主键?以下答案/代码绝对正常,非常感谢thelatemail.
nms <- unlist(lapply(seq_len(length(df)), combn, x=names(df), simplify=FALSE), …Run Code Online (Sandbox Code Playgroud)