我正在尝试org.apache.hadoop.fs.FileUtil.unTar
直接从 pyspark shell访问。
我知道我可以访问底层虚拟机(通过 py4j)sc._jvm
来做到这一点,但我很难真正连接到 hdfs(尽管我的 pyspark 会话完全没有其他功能,并且能够针对集群内的作业在集群中运行作业) .
例如:
hdpUntar = sc._jvm.org.apache.hadoop.fs.FileUtil.unTar
hdpFile = sc._jvm.java.io.File
root = hdpFile("hdfs://<url>/user/<file>")
target = hdpFile("hdfs://<url>/user/myuser/untar")
hdpUntar(root, target)
Run Code Online (Sandbox Code Playgroud)
不幸的是,这不起作用:
Py4JJavaError: An error occurred while calling z:org.apache.hadoop.fs.FileUtil.unTar.
: ExitCodeException exitCode=128: tar: Cannot connect to hdfs: resolve failed
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.fs.FileUtil.unTarUsingTar(FileUtil.java:675)
at org.apache.hadoop.fs.FileUtil.unTar(FileUtil.java:651)
at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud) 如何在proc sql中获取观察的行号,类似于procsql中的datastep的_N_?
例如
proc sql outobs=5;
select case mod(<something>, 2)
when 0 then "EVEN"
else "ODD"
end
from maps.africa
end;
Run Code Online (Sandbox Code Playgroud)
想:
Row
----------
1 odd
2 even
3 odd
.
.
.
Run Code Online (Sandbox Code Playgroud) 我想创建一个离散变量,它将变量x除以1000的间隔.我所拥有的是以下内容:
DATA have;
INPUT x;
DATALINES;
1200
3200
5300
49
6500
;
RUN;
Run Code Online (Sandbox Code Playgroud)
我想要的是这样的
data want;
input x y $5-14;
DATALINES;
1200 1000-2000
3200 3000-4000
5300 5000-6000
0049 0000-1000
6500 6000-7000
;
RUN;
Run Code Online (Sandbox Code Playgroud)
我当然可以使用if else语句编写数据步骤来定义间隔,但是我有100个这样的间隔,所以这将是非常耗时的.
谢谢.
我试图将参数列表传递给函数.
scala> val a = Array("col1", "col2")
a: Array[String] = Array(col1, col2)
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用:_*
符号,但它不起作用.我不能为我的生活找出原因!
val edges = all_edges.select(a:_*)
<console>:27: error: overloaded method value select with alternatives:
(col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
(cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
cannot be applied to (String)
Run Code Online (Sandbox Code Playgroud)
但是,这确实有效:
val edges = all_edges.select("col1", "col2")
不确定它是否相关,但all_edges是一个火花数据框,我试图通过在列表中指定列来保留列.
scala> all_edges
res4: org.apache.spark.sql.DataFrame
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?我一直试图从例如中找出语法.将List的元素作为参数传递给具有可变参数的函数,但似乎没有走得太远
编辑:刚刚找到如何在spark的数据框中"否定选择"列 - 但我很困惑为什么语法twocol.select(selectedCols.head, selectedCols.tail: _*)
是必要的?