有人可以解释我在Pigscript中得到以下输出
我的输入文件如下
A.TXT
aaa.kyl,data,data
bbb.kkk,data,data
cccccc.hj,data,data
qa.dff,data,data
Run Code Online (Sandbox Code Playgroud)
我正在写这样的猪脚本
A = LOAD 'a.txt' USING PigStorage(',') AS(a1:chararray,a2:chararray,a3:chararray);
B = FOREACH A GENERATE FLATTEN(STRSPLIT(a1)),a2,a3;
Run Code Online (Sandbox Code Playgroud)
我不知道如何继续这个...我需要像下面这样放.基本上我需要在第一个原子点符号后面的所有字符
(kyl,data,data)
(kkk,data,data)
(hj,data,data)
(dff,data,data)
Run Code Online (Sandbox Code Playgroud)
有人可以给我这个代码
Spark sql 窗口功能似乎无法正常工作。我正在 Hadoop 集群中运行 Spark 作业,其中 HDFS 块大小为 128 MB,Spark 版本 1.5 CDH 5.5
我正在读取 avro 文件并执行以下操作
我的要求:
如果有多条记录具有相同的 data_rfe_id,则根据最大 seq_id 和最大 service_id 取单个记录
我看到在原始数据中有些记录具有相同的 data_rfe_id 和相同的 seq_id 因此,我使用 Window 函数应用 row_number ,以便我可以使用 row_num === 1 过滤记录
我只想使用窗口函数来实现这一点。
为什么会出现这样的情况呢?
在数据框上应用窗口函数之前是否需要重新洗牌?
它仅针对某些任务抛出以下异常,并且在 4 次重复失败的任务后作业失败?
我们什么时候会遇到这种异常。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
.....
scala> df.printSchema
root
|-- transitional_key: string (nullable = true)
|-- seq_id: string (nullable = true)
|-- data_rfe_id: string (nullable = true)
|-- service_id: string (nullable = true)
|-- event_start_date_time: string …Run Code Online (Sandbox Code Playgroud) 我的 Spark 应用程序读取 3 个 7 MB、40 MB、100MB 的文件以及如此多的转换并存储多个目录
Spark版本CDH1.5
MASTER_URL=yarn-cluster
NUM_EXECUTORS=15
EXECUTOR_MEMORY=4G
EXECUTOR_CORES=6
DRIVER_MEMORY=3G
Run Code Online (Sandbox Code Playgroud)
我的 Spark 作业运行了一段时间,然后抛出以下错误消息并从头开始重新启动
18/03/27 18:59:44 INFO avro.AvroRelation: using snappy for Avro output
18/03/27 18:59:47 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
18/03/27 18:59:47 INFO CuratorFrameworkSingleton: Closing ZooKeeper client.
Run Code Online (Sandbox Code Playgroud)
再次重新启动后,它运行了一段时间并因此错误而失败
Application application_1521733534016_7233 failed 2 times due to AM Container for appattempt_1521733534016_7233_000002 exited with exitCode: -104
For more detailed output, check application tracking page:http://entline.com:8088/proxy/application_1521733534016_7233/Then, click on links to logs of each attempt.
Diagnostics: Container [pid=52716,containerID=container_e98_1521733534016_7233_02_000001] is running beyond …Run Code Online (Sandbox Code Playgroud) 可以使用以下命令将名为records.txt的文件从本地复制到HDFS
hadoop dfs -put /home/cloudera/localfiles/records.txt /user/cloudera/inputfiles
Run Code Online (Sandbox Code Playgroud)
通过使用上述命令,文件records.txt将被复制到同名的HDFS中。
但我想将两个文件(records1.txt 和 demo.txt)存储到 HDFS 中
我知道我们可以使用如下所示的东西
hadoop dfs -put /home/cloudera/localfiles/records* /user/cloudera/inputfiles
Run Code Online (Sandbox Code Playgroud)
但是有没有任何命令可以帮助我们存储一两个不同名称的文件并将其复制到 hdfs 中?
在Mapreduce中,我们说映射器产生的输出称为中间数据.
中间数据是否也被复制?
中间数据是暂时的吗?
中间数据何时被删除?是自动删除还是我们需要明确删除它?
我有一个有3个块的file.txt(块a,块b,块c).hadoop如何将这些块写入Cluster ..我的问题是hadoop是否遵循并行写入?或者块b是否必须等待块a写入群集?或阻止a和块b和块c平行写入hadoop簇...
我是斯卡拉的新手.
我想了解下面代码的语法
object PlainSimple {
def main(args:Array[String])
{
val m = add(5)
println(m(3))
}
def add(x:Int):Int=>Int =
{
y=>y+x
}
}
Run Code Online (Sandbox Code Playgroud)
我的问题是我们在哪里说add函数正在返回另一个函数?
什么Int=>Int意思?
在add函数里面叫做y什么?为什么我们在没有声明的情况下使用它呢?
如果想在add方法中添加多行,需要做什么?
我在spark变换函数中有一个简单的问题.
coalesce(numPartitions) - 将RDD中的分区数减少为numPartitions.过滤大型数据集后,可以更有效地运行操作.
val dataRDD = sc.textFile("/user/cloudera/inputfiles/records.txt")
val filterRDD = dataRDD.filter(record => record.split(0) == "USA")
val resizeRDD = filterRDD.coalesce(50)
val result = resizeRDD.collect
Run Code Online (Sandbox Code Playgroud)
我的问题是
coalesce(numPartitions)是否会从filterRDD中删除空分区?
合并(numPartitions)是否经过改组?
我想读取两个日期范围之间的所有日期,这个范围包括开始日期和结束日期
input_start_date="2013-09-05"
input_end_date="2013-09-10"
START_DATE=$(date -I -d "$input_start_date") || exit -1
END_DATE=$(date -I -d "$input_end_date") || exit -1
d="$START_DATE"
while [ "$d" <= "$END_DATE" ]; do
echo $d
d=$(date -I -d "$d + 1 day")
done
Run Code Online (Sandbox Code Playgroud)
但是当我运行上面的代码时,我得到以下错误
bash: = 2013-09-10: No such file or directory
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗
预期输出
2013-09-05
2013-09-06
2013-09-07
2013-09-08
2013-09-09
2013-09-10
Run Code Online (Sandbox Code Playgroud) val partitionsColumns = "idnum,monthnum"
val partitionsColumnsList = partitionsColumns.split(",").toList
val loc = "/data/omega/published/invoice"
val df = sqlContext.read.parquet(loc)
val windowFunction = Window.partitionBy (partitionsColumnsList:_*).orderBy(df("effective_date").desc)
Run Code Online (Sandbox Code Playgroud)
Run Code Online (Sandbox Code Playgroud)<console>:38: error: overloaded method value partitionBy with alternatives: (cols: org.apache.spark.sql.Column*) org.apache.spark.sql.expressions.WindowSpec <and> (colName: String,colNames: String*)org.apache.spark.sql.expressions.WindowSpec cannot be applied to (String) val windowFunction = Window.partitionBy(partitionsColumnsList:_*).orderBy(df("effective_date").desc)
是否可以将列列表发送到partitionBy方法 Spark/Scala?
我已经实现了将一列传递给有效的partitionBy方法。我不知道如何将多列传递给partitionByMethod
基本上我想传递List(Columns)给partitionBy方法
Spark 版本是 1.6。
apache-spark ×4
hadoop ×2
apache-pig ×1
bash ×1
hadoop-yarn ×1
hdfs ×1
linux ×1
mapreduce ×1
scala ×1