小编Lev*_*man的帖子

在Sparklyr中指定col类型(spark_read_csv)

我正在使用SpraklyR在csv中读到spark

schema <- structType(structField("TransTime", "array<timestamp>", TRUE),
                 structField("TransDay", "Date", TRUE))

 spark_read_csv(sc, filename, "path", infer_schema = FALSE, schema = schema)
Run Code Online (Sandbox Code Playgroud)

但得到:

Error: could not find function "structType"
Run Code Online (Sandbox Code Playgroud)

如何使用spark_read_csv指定colunm类型?

提前致谢.

r sparklyr

4
推荐指数
2
解决办法
1673
查看次数

用于spark SQL的SparklyR包装器:sqlContext.sql

我正在尝试为SparklyR编写SQL函数的包装器.我创建了以下函数:

sqlfunction <- function(sc, block) {
  spark_context(sc) %>% 
invoke("sqlContext.sql", block) }
Run Code Online (Sandbox Code Playgroud)

然后我用以下方法调用它:

newsqlData <- sqlfunction(sc, "select
                          substr(V1,1,2),
                          substr(V1,3,3),
                          substr(V1,6,6),
                          substr(V1,12,4),
                          substr(V1,16,4)
                          FROM TABLE1 WHERE V1 IS NOT NULL")
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

Error: java.lang.IllegalArgumentException: invalid method sqlContext.sql for object 12
at sparklyr.Invoke$.invoke(invoke.scala:113)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at …
Run Code Online (Sandbox Code Playgroud)

r apache-spark apache-spark-sql r-dbi sparklyr

2
推荐指数
1
解决办法
1479
查看次数

用于大型数据集的 sparklyr 中 copy_to 的替代方法

我有下面的代码,它使用使用 Sparklyr 调用 spark SQL API 的包装函数对数据集进行 SQL 转换。然后我使用“invoke(“createOrReplaceTempView”,“name”)”将 Spark 环境中的表保存为 spark 数据框,以便我可以在以后的函数调用中调用。然后我使用 dplyr 代码“mutate”调用配置单元函数“regexp_replace”将字母转换为数字(0)。我他们需要再次调用一个 SQL 函数。

但是,这样做我似乎必须使用 sparklyr 中的“copy_to”函数。在大型数据集上,“copy_to”函数会导致以下错误:

Error: org.apache.spark.SparkException: Job aborted due to stage
failure: Total size of serialized results of 6 tasks (1024.6 MB) is
bigger than spark.driver.maxResultSize (1024.0 MB)
Run Code Online (Sandbox Code Playgroud)

有没有替代“copy_to”的方法,它允许我获得一个火花数据框,然后我可以用 SQL API 调用它?

这是我的代码L

 sqlfunction <- function(sc, block) {
  spark_session(sc) %>% invoke("sql", block)
 } 

sqlfunction(sc, "SELECT * FROM 
test")%>%invoke("createOrReplaceTempView", 
"name4")

names<-tbl(sc, "name4")%>% 
  mutate(col3 = regexp_replace(col2, "^[A-Z]", "0"))

copy_to(sc, names, overwrite = TRUE)

sqlfunction(sc, "SELECT * …
Run Code Online (Sandbox Code Playgroud)

hive r apache-spark-sql sparklyr

2
推荐指数
1
解决办法
1175
查看次数

使用hive命令更改DF中的字符串,使用sparklyr更改mutate

使用Hive命令regexp_extract我试图更改以下字符串:

201703170455 to 2017-03-17:04:55
Run Code Online (Sandbox Code Playgroud)

来自:

2017031704555675 to 2017-03-17:04:55.0010
Run Code Online (Sandbox Code Playgroud)

我在sparklyr中尝试使用这个与R中的gsub一起使用的代码:

  newdf<-df%>%mutate(Time1 = regexp_extract(Time, "(....)(..)(..)(..)(..)", "\\1-\\2-\\3:\\4:\\5"))
Run Code Online (Sandbox Code Playgroud)

而这段代码:

newdf<-df%>mutate(TimeTrans = regexp_extract("(....)(..)(..)(..)(..)(....)", "\\1-\\2-\\3:\\4:\\5.\\6"))
Run Code Online (Sandbox Code Playgroud)

但根本不起作用.有关如何使用regexp_extract执行此操作的任何建议?

hive r gsub apache-spark sparklyr

1
推荐指数
1
解决办法
244
查看次数

标签 统计

r ×4

sparklyr ×4

apache-spark ×2

apache-spark-sql ×2

hive ×2

gsub ×1

r-dbi ×1