标签: apache-spark-sql

为什么我的 Spark 应用程序无法编译“对象 SparkSession 不是包的成员”但 spark-core 是依赖项?

我是 Spark 开发的新手,并尝试在 redhat linux 环境中使用 sbt 构建我的第一个 spark2(scala) 应用程序。下面是环境细节。

CDH Version: 5.11.0
Apache Spark2: 2.1.0.cloudera1
Scala Version: 2.11.11
Java Version: 1.7.0_101
Run Code Online (Sandbox Code Playgroud)

应用代码:

import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql

object MySample {
def main(args: Array[String]) {

    val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
    val spark = SparkSession
       .builder()
       .appName("FirstApplication")
       .config("spark.sql.warehouse.dir", warehouseLocation)
       .getOrCreate()

    val schPer = new StructType(Array(
    new StructField("Column1",IntegerType,false),
    new StructField("Column2",StringType,true),
    new StructField("Column3",StringType,true),
    new StructField("Column4",IntegerType,true)
    ))
val dfPeriod = spark.read.format("csv").option("header",false).schema(schPer).load("/prakash/periodFiles/")
dfPeriod.write.format("csv").save("/prakash/output/dfPeriod")
}
Run Code Online (Sandbox Code Playgroud)

}

使用 sbt 编译时出现以下错误。

$ sbt
[info] Loading project …
Run Code Online (Sandbox Code Playgroud)

hadoop scala sbt apache-spark apache-spark-sql

0
推荐指数
1
解决办法
2374
查看次数

多列上的 PySpark 数据框过滤器

使用 Spark 2.1.1

下面是我的数据框

id Name1   Name2

1 Naveen Srikanth 

2 Naveen Srikanth123

3 Naveen 

4 Srikanth Naveen
Run Code Online (Sandbox Code Playgroud)

现在需要根据两个条件过滤行,即 2 和 3 需要过滤掉,因为名称有数字的 123 和 3 有空值

使用下面的代码只过滤行 id 2

df.select("*").filter(df["Name2"].rlike("[0-9]")).show()
Run Code Online (Sandbox Code Playgroud)

被卡住以包含第二个条件。

python filter apache-spark-sql pyspark

0
推荐指数
1
解决办法
3万
查看次数

我们可以在不创建模式的情况下在 spark 数据框中加载分隔文本文件吗?

我有格式良好的文本文件,如波纹管。

TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|
Run Code Online (Sandbox Code Playgroud)

现在我必须将此文本文件加载到 spark 数据框中。

我可以这样做

val schema = StructType(Array(

      StructField("OrgId", StringType),
      StructField("LineItemId", StringType),
      StructField("SegmentId", StringType), …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
1万
查看次数

使用标题和数据文件创建带有标题的数据帧

我有两个文件 data.csv 和 headers.csv。我想在 Spark/Scala 中创建带有标题的数据框。

var data = spark.sqlContext.read.format(
  "com.databricks.spark.csv").option("header", "true"
).option("inferSchema", "true").load(data_path) 
Run Code Online (Sandbox Code Playgroud)

你能帮我自定义上面的行来做到这一点吗?

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1746
查看次数

Spark agg为多个列收集单个列表

这是我当前的代码:

pipe_exec_df_final_grouped = pipe_exec_df_final.groupBy("application_id").agg(collect_list("table_name").alias("tables"))
Run Code Online (Sandbox Code Playgroud)

但是,在我的收集列表中,我想要多个列值,因此聚合列将是一个数组数组。当前结果如下:

1|[a,b,c,d]
2|[e,f,g,h]
Run Code Online (Sandbox Code Playgroud)

但是,我还想保留另一个附加到聚集的列(我们称其为“状态”列名称)。所以我的新输出将是:

1|[[a,pass],[b,fail],[c,fail],[d,pass]]
...
Run Code Online (Sandbox Code Playgroud)

我试过collect_list("table_name, status") 但是collect_list只用一个列名。我该如何完成我想做的事情?

scala group-by apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1133
查看次数

如何在Spark SQL中压缩两个数组列

我有一个熊猫数据框。我尝试将包含字符串值的两列首先连接到列表中,然后使用zip将列表中的每个元素都用'_'连接。我的数据集如下:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'
Run Code Online (Sandbox Code Playgroud)

我想将这两列连接到第三列,如下所示,用于数据框的每一行。

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]
Run Code Online (Sandbox Code Playgroud)

我已经使用下面的代码在python中成功完成了此操作,但该数据框非常大,并且需要花费很长时间才能为整个数据框运行它。我想在PySpark中做同样的事情以提高效率。我已经成功读取了spark数据框中的数据,但是我很难确定如何使用PySpark等效函数复制Pandas函数。如何在PySpark中获得想要的结果?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]
Run Code Online (Sandbox Code Playgroud)

我已使用以下代码将两列转换为PySpark中的数组

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)
Run Code Online (Sandbox Code Playgroud)

现在,我需要使用'_'在两列中压缩数组的每个元素。我该如何使用zip?任何帮助表示赞赏。

python pandas apache-spark apache-spark-sql pyspark

0
推荐指数
2
解决办法
3087
查看次数

spark:如何将行合并到jsons数组

输入:

id1   id2    name   value           epid
"xxx" "yyy"  "EAN"  "5057723043"    "1299"
"xxx" "yyy"  "MPN"  "EVBD"          "1299"
Run Code Online (Sandbox Code Playgroud)

我想要:

{         "id1": "xxx",
          "id2": "yyy",
          "item_specifics": [
            {
              "name": "EAN",
              "value": "5057723043"
            },
            {
              "name": "MPN",
              "value": "EVBD"
            },
            {
              "name": "EPID",
              "value": "1299"
            }
          ]
}
Run Code Online (Sandbox Code Playgroud)

我从如何将列聚合到json数组中尝试了以下两种解决方案以及如何将行合并为spark数据框的列作为有效json以将其写入mysql中

pi_df.groupBy(col("id1"), col("id2"))
  //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
  .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))
Run Code Online (Sandbox Code Playgroud)

但是我得到了:

{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }
Run Code Online (Sandbox Code Playgroud)

如何解决这个问题?谢谢

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
574
查看次数

为什么在.distinct()之后总是有.collect()?

初学者火花。经常看到.distinct()。collect()结构。什么是在distinct()函数之后立即拥有collect()函数的内在原因是什么?

scala apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
122
查看次数

爆炸功能的反作用

在带有spark-2.4的scala中,我想过滤列中数组内部的值。

+---+------------+
| id|      letter|
+---+------------+
|  1|[x, xxx, xx]|
|  2|[yy, y, yyy]|
+---+------------+
Run Code Online (Sandbox Code Playgroud)

+---+-------+
| id| letter|
+---+-------+
|  1|[x, xx]|
|  2|[yy, y]|
+---+-------+
Run Code Online (Sandbox Code Playgroud)

我想到了使用explode+filter

val res = Seq(("1", Array("x", "xxx", "xx")), ("2", Array("yy", "y", "yyy"))).toDF("id", "letter")
res.withColumn("tmp", explode(col("letter"))).filter(length(col("tmp")) < 3).drop(col("letter")).show()
Run Code Online (Sandbox Code Playgroud)

我正在

+---+---+
| id|tmp|
+---+---+
|  1|  x|
|  1| xx|
|  2| yy|
|  2|  y|
+---+---+
Run Code Online (Sandbox Code Playgroud)

如何按ID zip / groupBy返回?

还是有更好,更优化的解决方案?

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
90
查看次数

在scala中的数据框上应用groupBy和orderBy

我需要对列值进行排序并在数据框中分组另一列。

数据框中的数据如下所示。

+------------+---------+-----+
|      NUM_ID|    TIME |SIG_V|
+------------+---------+-----+
|XXXXX01     |167499000|55   |
|XXXXX02     |167499000|     |
|XXXXX01     |167503000|     |
|XXXXX02     |179810000| 81.0|
|XXXXX02     |179811000| 81.0|
|XXXXX01     |179833000|     |
|XXXXX02     |179833000|     |
|XXXXX02     |179841000| 81.0|
|XXXXX01     |179841000|     |
|XXXXX02     |179842000| 81.0|
|XXXXX03     |179843000| 87.0|
|XXXXX02     |179849000|     |
|XXXXX02     |179850000|     |
|XXXXX01     |179850000| 88.0|
|XXXXX01     |179857000|     |
|XXXXX01     |179858000|     |
|XXXXX01     |179865000|     |
|XXXXX03     |179865000|     |
|XXXXX02     |179870000|     |
|XXXXX02     |179871000| 11  |
+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)

以上数据已按TIME列排序。

我的要求是将NUM_ID列分组,如下所示。

+------------+---------+-----+
|      NUM_ID|    TIME …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
64
查看次数