用于spark SQL的SparklyR包装器:sqlContext.sql

Lev*_*man 2 r apache-spark apache-spark-sql r-dbi sparklyr

我正在尝试为SparklyR编写SQL函数的包装器.我创建了以下函数:

sqlfunction <- function(sc, block) {
  spark_context(sc) %>% 
invoke("sqlContext.sql", block) }
Run Code Online (Sandbox Code Playgroud)

然后我用以下方法调用它:

newsqlData <- sqlfunction(sc, "select
                          substr(V1,1,2),
                          substr(V1,3,3),
                          substr(V1,6,6),
                          substr(V1,12,4),
                          substr(V1,16,4)
                          FROM TABLE1 WHERE V1 IS NOT NULL")
Run Code Online (Sandbox Code Playgroud)

但是我收到以下错误:

Error: java.lang.IllegalArgumentException: invalid method sqlContext.sql for object 12
at sparklyr.Invoke$.invoke(invoke.scala:113)
at sparklyr.StreamHandler$.handleMethodCall(stream.scala:89)
at sparklyr.StreamHandler$.read(stream.scala:55)
at sparklyr.BackendHandler.channelRead0(handler.scala:49)
at sparklyr.BackendHandler.channelRead0(handler.scala:14)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

任何建议或修复将不胜感激.

use*_*411 6

它应该是:

sqlfunction <- function(sc, block) {
  spark_session(sc) %>% invoke("sql", block)
}
Run Code Online (Sandbox Code Playgroud)

其中scspark_connection(输出从:spark_connect(master = master_url)).

这个:

  • spark_session(sc)- SparkSession从连接对象中检索.
  • invoke("sql", block)- sqlSparkSession实例的形式调用实例的方法block.

用例:

library(sparklyr)

sc <- spark_connect(master = "local[*]")
sqlfunction(sc, "SELECT SPLIT('foo,bar', ',')")
Run Code Online (Sandbox Code Playgroud)
<jobj[11]>
  class org.apache.spark.sql.Dataset
  [split(foo,bar, ,): array<string>]
Run Code Online (Sandbox Code Playgroud)

这将为您提供Java对象的引用.如果你想要你可以例如注册为临时表:

... %>% invoke("createOrReplaceTempView", "some_name_for_the_view")
Run Code Online (Sandbox Code Playgroud)

和访问tbl:

library(dplyr)

tbl(sc, "some_name_for_the_view") 
Run Code Online (Sandbox Code Playgroud)

要么

... %>% sdf_register()
Run Code Online (Sandbox Code Playgroud)

tbl_spark直接获取对象

您使用的代码:

  • spark_context- 提取SparkContext实例.
  • invoke("sqlContext.sql", block)- 尝试调用不存在的方法(sqlContext.sql).

在最新版本中,您可以invoke("createOrReplaceTempView", ...)简单地替换sdf_register.