我是 Spark 开发的新手,并尝试在 redhat linux 环境中使用 sbt 构建我的第一个 spark2(scala) 应用程序。下面是环境细节。
CDH Version: 5.11.0
Apache Spark2: 2.1.0.cloudera1
Scala Version: 2.11.11
Java Version: 1.7.0_101
Run Code Online (Sandbox Code Playgroud)
应用代码:
import org.apache.spark.sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql
object MySample {
def main(args: Array[String]) {
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.appName("FirstApplication")
.config("spark.sql.warehouse.dir", warehouseLocation)
.getOrCreate()
val schPer = new StructType(Array(
new StructField("Column1",IntegerType,false),
new StructField("Column2",StringType,true),
new StructField("Column3",StringType,true),
new StructField("Column4",IntegerType,true)
))
val dfPeriod = spark.read.format("csv").option("header",false).schema(schPer).load("/prakash/periodFiles/")
dfPeriod.write.format("csv").save("/prakash/output/dfPeriod")
}
Run Code Online (Sandbox Code Playgroud)
}
使用 sbt 编译时出现以下错误。
$ sbt
[info] Loading project …Run Code Online (Sandbox Code Playgroud) 使用 Spark 2.1.1
下面是我的数据框
id Name1 Name2
1 Naveen Srikanth
2 Naveen Srikanth123
3 Naveen
4 Srikanth Naveen
Run Code Online (Sandbox Code Playgroud)
现在需要根据两个条件过滤行,即 2 和 3 需要过滤掉,因为名称有数字的 123 和 3 有空值
使用下面的代码只过滤行 id 2
df.select("*").filter(df["Name2"].rlike("[0-9]")).show()
Run Code Online (Sandbox Code Playgroud)
被卡住以包含第二个条件。
我有格式良好的文本文件,如波纹管。
TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|
Run Code Online (Sandbox Code Playgroud)
现在我必须将此文本文件加载到 spark 数据框中。
我可以这样做
val schema = StructType(Array(
StructField("OrgId", StringType),
StructField("LineItemId", StringType),
StructField("SegmentId", StringType), …Run Code Online (Sandbox Code Playgroud) 我有两个文件 data.csv 和 headers.csv。我想在 Spark/Scala 中创建带有标题的数据框。
var data = spark.sqlContext.read.format(
"com.databricks.spark.csv").option("header", "true"
).option("inferSchema", "true").load(data_path)
Run Code Online (Sandbox Code Playgroud)
你能帮我自定义上面的行来做到这一点吗?
这是我当前的代码:
pipe_exec_df_final_grouped = pipe_exec_df_final.groupBy("application_id").agg(collect_list("table_name").alias("tables"))
Run Code Online (Sandbox Code Playgroud)
但是,在我的收集列表中,我想要多个列值,因此聚合列将是一个数组数组。当前结果如下:
1|[a,b,c,d]
2|[e,f,g,h]
Run Code Online (Sandbox Code Playgroud)
但是,我还想保留另一个附加到聚集的列(我们称其为“状态”列名称)。所以我的新输出将是:
1|[[a,pass],[b,fail],[c,fail],[d,pass]]
...
Run Code Online (Sandbox Code Playgroud)
我试过collect_list("table_name, status") 但是collect_list只用一个列名。我该如何完成我想做的事情?
我有一个熊猫数据框。我尝试将包含字符串值的两列首先连接到列表中,然后使用zip将列表中的每个元素都用'_'连接。我的数据集如下:
df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'
Run Code Online (Sandbox Code Playgroud)
我想将这两列连接到第三列,如下所示,用于数据框的每一行。
df['column_3']: [abc_1.0, def_2.0, ghi_3.0]
Run Code Online (Sandbox Code Playgroud)
我已经使用下面的代码在python中成功完成了此操作,但该数据框非常大,并且需要花费很长时间才能为整个数据框运行它。我想在PySpark中做同样的事情以提高效率。我已经成功读取了spark数据框中的数据,但是我很难确定如何使用PySpark等效函数复制Pandas函数。如何在PySpark中获得想要的结果?
df['column_3'] = df['column_2']
for index, row in df.iterrows():
while index < 3:
if isinstance(row['column_1'], str):
row['column_1'] = list(row['column_1'].split(','))
row['column_2'] = list(row['column_2'].split(','))
row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]
Run Code Online (Sandbox Code Playgroud)
我已使用以下代码将两列转换为PySpark中的数组
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split
crash.withColumn("column_1",
split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)
Run Code Online (Sandbox Code Playgroud)
现在,我需要使用'_'在两列中压缩数组的每个元素。我该如何使用zip?任何帮助表示赞赏。
输入:
id1 id2 name value epid
"xxx" "yyy" "EAN" "5057723043" "1299"
"xxx" "yyy" "MPN" "EVBD" "1299"
Run Code Online (Sandbox Code Playgroud)
我想要:
{ "id1": "xxx",
"id2": "yyy",
"item_specifics": [
{
"name": "EAN",
"value": "5057723043"
},
{
"name": "MPN",
"value": "EVBD"
},
{
"name": "EPID",
"value": "1299"
}
]
}
Run Code Online (Sandbox Code Playgroud)
我从如何将列聚合到json数组中尝试了以下两种解决方案?以及如何将行合并为spark数据框的列作为有效json以将其写入mysql中:
pi_df.groupBy(col("id1"), col("id2"))
//.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
.agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))
Run Code Online (Sandbox Code Playgroud)
但是我得到了:
{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }
Run Code Online (Sandbox Code Playgroud)
如何解决这个问题?谢谢
初学者火花。经常看到.distinct()。collect()结构。什么是在distinct()函数之后立即拥有collect()函数的内在原因是什么?
在带有spark-2.4的scala中,我想过滤列中数组内部的值。
从
+---+------------+
| id| letter|
+---+------------+
| 1|[x, xxx, xx]|
| 2|[yy, y, yyy]|
+---+------------+
Run Code Online (Sandbox Code Playgroud)
至
+---+-------+
| id| letter|
+---+-------+
| 1|[x, xx]|
| 2|[yy, y]|
+---+-------+
Run Code Online (Sandbox Code Playgroud)
我想到了使用explode+filter
val res = Seq(("1", Array("x", "xxx", "xx")), ("2", Array("yy", "y", "yyy"))).toDF("id", "letter")
res.withColumn("tmp", explode(col("letter"))).filter(length(col("tmp")) < 3).drop(col("letter")).show()
Run Code Online (Sandbox Code Playgroud)
我正在
+---+---+
| id|tmp|
+---+---+
| 1| x|
| 1| xx|
| 2| yy|
| 2| y|
+---+---+
Run Code Online (Sandbox Code Playgroud)
如何按ID zip / groupBy返回?
还是有更好,更优化的解决方案?
我需要对列值进行排序并在数据框中分组另一列。
数据框中的数据如下所示。
+------------+---------+-----+
| NUM_ID| TIME |SIG_V|
+------------+---------+-----+
|XXXXX01 |167499000|55 |
|XXXXX02 |167499000| |
|XXXXX01 |167503000| |
|XXXXX02 |179810000| 81.0|
|XXXXX02 |179811000| 81.0|
|XXXXX01 |179833000| |
|XXXXX02 |179833000| |
|XXXXX02 |179841000| 81.0|
|XXXXX01 |179841000| |
|XXXXX02 |179842000| 81.0|
|XXXXX03 |179843000| 87.0|
|XXXXX02 |179849000| |
|XXXXX02 |179850000| |
|XXXXX01 |179850000| 88.0|
|XXXXX01 |179857000| |
|XXXXX01 |179858000| |
|XXXXX01 |179865000| |
|XXXXX03 |179865000| |
|XXXXX02 |179870000| |
|XXXXX02 |179871000| 11 |
+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)
以上数据已按TIME列排序。
我的要求是将NUM_ID列分组,如下所示。
+------------+---------+-----+
| NUM_ID| TIME …Run Code Online (Sandbox Code Playgroud)