我正在尝试利用spark分区.我试图做类似的事情
data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.
为了避免我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.
写入后我应该如何使用分区来避免许多文件?
我有两个 spark 的数据框。其中之一使用 HiveContext 从 hive 表中收到:
spark_df1 = hc.sql("select * from testdb.titanic_pure_data_test")
Run Code Online (Sandbox Code Playgroud)
我从.csv文件中获得的第二个 spark 数据框:
lines = sc.textFile("hdfs://HDFS-1/home/testdb/1500000_Sales_Records.csv").map(lambda line: line.split(","))
spark_df_test = lines.toDF(['Region','Country','Item_Type','Sales_Channel','Order_Priority','Order_Date','Order_ID','Ship_Date','Units_Sold','Unit_Price','Unit_Cost','Total_Revenue','Total_Cost','Total_Profit'])`
Run Code Online (Sandbox Code Playgroud)
我想将任何数据框保存为配置单元表
spark_df1.write.mode("overwrite").format("orc").saveAsTable("testdb.new_res5")
保存的第一个数据帧没有问题,但是当我尝试以spark_df_test相同的方式保存第二个数据帧 ( ) 时,出现此错误
文件“/home/jup-user/testdb/scripts/caching.py”,第 90 行,在 spark_df_test.write.mode("overwrite").format("orc").saveAsTable("
testdb.new_res5") 文件 "/data_disk /opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py”,第 435 行,在 saveAsTable 文件中“/data_disk/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py” ,第 1257 行,通话中 文件“/data_disk/opt/cloudera/parcels/CDH-5.15.1-1.cdh5.15.1.p0.4/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第51行, 在 deco pyspark.sql.utils.AnalysisException: '临时表不允许指定数据库名称或其他限定符。如果表名中包含点(.),请用反引号(`)引用表名;;'
我在阅读火花数据帧时发现了这个奇怪的问题.我将数据帧重新划分为50k分区.但是,当我在数据帧上读取并执行计数操作时,我发现当我使用spark 2.0时,底层rdd只有2143个分区.
所以我去了保存重新分区数据的路径并发现了
hfs -ls /repartitionedData/ | wc -l
50476
Run Code Online (Sandbox Code Playgroud)
因此它在保存数据的同时创建了50k的分区.
但是有了火花2.0,
val d = spark.read.parquet("repartitionedData")
d.rdd.getNumPartitions
res4: Int = 2143
Run Code Online (Sandbox Code Playgroud)
但是火花1.5,
val d = spark.read.parquet("repartitionedData")
d.rdd.partitions.length
res4: Int = 50474
Run Code Online (Sandbox Code Playgroud)
有人可以帮我弄这个吗?
hadoop bigdata apache-spark apache-spark-sql spark-dataframe
我是 scala 中的 apache spark sql 的新手。
如何在 Apache spark sql 数据帧中找到每行的大小并丢弃大小超过千字节阈值的行。我正在寻找 Scala 解决方案。
我目前正在编写一个脚本来尝试解决第13个Euler问题,但每当我运行它时它会产生以下错误.
线程"main"java.lang.NumberFormatException中的异常:对于java.base/java.lang.Long.parseLong中java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)的输入字符串:"37107287533902102798797998220837590246510135740250" (Long.java:692)在java.base/java.lang.Long.parseLong(Long.java:817)的Script_013.main(Script_013.java:108)
进程以退出代码1结束
我不确定为什么会发生这种情况,因为我确信每个字符串都只包含数字.我.trim()在其他帖子上添加了很多评论的建议,但这似乎没有什么区别.
有谁知道为什么会这样.感谢所有帮助,谢谢!
以下是代码;
public class Script_013
{
public static void main (String [] args)
{
String numbers[] = { "37107287533902102798797998220837590246510135740250",
"46376937677490009712648124896970078050417018260538",
"74324986199524741059474233309513058123726617309629",
"91942213363574161572522430563301811072406154908250",
"23067588207539346171171980310421047513778063246676",
"89261670696623633820136378418383684178734361726757",
"28112879812849979408065481931592621691275889832738",
"44274228917432520321923589422876796487670272189318",
"47451445736001306439091167216856844588711603153276",
"70386486105843025439939619828917593665686757934951",
"62176457141856560629502157223196586755079324193331",
"64906352462741904929101432445813822663347944758178",
"92575867718337217661963751590579239728245598838407",
"58203565325359399008402633568948830189458628227828",
"80181199384826282014278194139940567587151170094390",
"35398664372827112653829987240784473053190104293586",
"86515506006295864861532075273371959191420517255829",
"71693888707715466499115593487603532921714970056938",
"54370070576826684624621495650076471787294438377604",
"53282654108756828443191190634694037855217779295145",
"36123272525000296071075082563815656710885258350721",
"45876576172410976447339110607218265236877223636045",
"17423706905851860660448207621209813287860733969412",
"81142660418086830619328460811191061556940512689692",
"51934325451728388641918047049293215058642563049483",
"62467221648435076201727918039944693004732956340691",
"15732444386908125794514089057706229429197107928209",
"55037687525678773091862540744969844508330393682126",
"18336384825330154686196124348767681297534375946515",
"80386287592878490201521685554828717201219257766954",
"78182833757993103614740356856449095527097864797581",
"16726320100436897842553539920931837441497806860984",
"48403098129077791799088218795327364475675590848030",
"87086987551392711854517078544161852424320693150332",
"59959406895756536782107074926966537676326235447210",
"69793950679652694742597709739166693763042633987085",
"41052684708299085211399427365734116182760315001271",
"65378607361501080857009149939512557028198746004375",
"35829035317434717326932123578154982629742552737307",
"94953759765105305946966067683156574377167401875275",
"88902802571733229619176668713819931811048770190271",
"25267680276078003013678680992525463401061632866526",
"36270218540497705585629946580636237993140746255962",
"24074486908231174977792365466257246923322810917141",
"91430288197103288597806669760892938638285025333403",
"34413065578016127815921815005561868836468420090470",
"23053081172816430487623791969842487255036638784583",
"11487696932154902810424020138335124462181441773470",
"63783299490636259666498587618221225225512486764533",
"67720186971698544312419572409913959008952310058822",
"95548255300263520781532296796249481641953868218774",
"76085327132285723110424803456124867697064507995236",
"37774242535411291684276865538926205024910326572967",
"23701913275725675285653248258265463092207058596522",
"29798860272258331913126375147341994889534765745501", …Run Code Online (Sandbox Code Playgroud) 我有需要从 spark 读取的镶木地板文件。有些文件缺少一些列,这些列存在于新文件中。
由于我不知道哪些文件缺少列,因此我需要读取 spark 中的所有文件。我有需要阅读的列列表。也可能是所有文件都缺少某些列。我需要在那些缺失的列中放置一个空值。
当我尝试做一个时,
sqlContext.sql('query')它给我错误说缺少列
如果我定义架构并执行
sqlContext.read.parquet('s3://....').schema(parquet_schema)
它给了我同样的错误。
在这里帮助我