小编Sha*_*ala的帖子

在 Windows 中为 pyspark 设置的环境变量

我的笔记本电脑上安装了 Spark。我能够执行“spark-shell”命令并打开 scala shell,如下所示。 C:\Spark1_6\spark-1.6.0-bin-hadoop2.6\bin>spark-shell scala> 但是当我尝试执行pyspark命令时。 C:\Spark1_6\spark-1.6.0-bin-hadoop2.6\bin>pyspark

我收到以下错误消息。

'python' 未被识别为内部或外部命令

我确实手动设置了环境用户“路径”变量。通过附加

";C:\Python27"

我重新启动了笔记本电脑,但仍然出现相同的错误。任何人都可以帮我解决这个问题吗?我没有正确更新环境变量吗?

版本:Spark:1.6.2 Windows:8.1

environment-variables apache-spark pyspark

4
推荐指数
1
解决办法
9344
查看次数

填充Pyspark数据帧

我有一个Pyspark数据帧(原始数据帧)具有以下数据(所有列都有字符串数据类型):

  id           Value
   1             103
   2             1504
   3              1  
Run Code Online (Sandbox Code Playgroud)

我需要在value列中创建一个带有padding的新修改数据帧,这样该列的长度应为4个字符.如果length小于4个字符,则在数据中添加0,如下所示:

  id             Value
   1             0103
   2             1504
   3             0001  
Run Code Online (Sandbox Code Playgroud)

有人可以帮我吗?如何使用Pyspark数据框实现它?任何帮助将不胜感激.

pyspark spark-dataframe

3
推荐指数
2
解决办法
6010
查看次数

Scala根据时间列将单行拆分为多行

我有以下格式的数据框:

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |           10800 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            7000 | 
Run Code Online (Sandbox Code Playgroud)

我需要将此数据帧转换为以下数据,基本上,我需要每小时创建一个条目,因此,如果WatchTime(sec)超过3600秒,则需要在下一个小时创建一个新条目

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |            3600 |
|user1 | 2019-07-28 |  22 |        100 |            3600 |
|user1 | 2019-07-28 |  23 |        100 |            3600 |
|user2 | 2019-07-28 |  20 | …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

3
推荐指数
1
解决办法
78
查看次数

Pyspark从数据框中的列中删除空值

我的数据框如下所示

ID,FirstName,LastName

1,Navee,Srikanth

2,,Srikanth 

3,Naveen,
Run Code Online (Sandbox Code Playgroud)

现在我的问题陈述是,由于名字为空,我必须删除第2行。

我正在使用以下pyspark脚本

join_Df1= Name.filter(Name.col(FirstName).isnotnull()).show()
Run Code Online (Sandbox Code Playgroud)

我收到错误消息

  File "D:\0\NameValidation.py", line 13, in <module>
join_Df1= filter(Name.FirstName.isnotnull()).show()
Run Code Online (Sandbox Code Playgroud)

TypeError:“列”对象不可调用

谁能帮我解决这个问题

python hadoop mapreduce apache-spark pyspark

2
推荐指数
2
解决办法
2万
查看次数

如何在Apache Spark中获取上一行的数据

从Spark Data框架中查找每个城市的上个月销售额

|City|     Month   |Sale|
+----+----------- +----- +
|  c1|    JAN-2017|  49 |
|  c1|    FEB-2017|  46 |
|  c1|    MAR-2017|  83 |
|  c2|    JAN-2017|  59 |
|  c2|    MAY-2017|  60 |
|  c2|    JUN-2017|  49 |
|  c2|    JUL-2017|  73 |
+----+-----+----+-------
Run Code Online (Sandbox Code Playgroud)

所需的解决方案是

|City|     Month  |Sale   |previous_sale|
+----+-----+-------+-------------+--------
|  c1|    JAN-2017|  49|           NULL  |
|  c1|    FEB-2017|  46|           49    |
|  c1|    MAR-2017|  83|           46    |
|  c2|    JAN-2017|  59|           NULL  |
|  c2|    MAY-2017|  60|           59    | …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark

2
推荐指数
1
解决办法
3698
查看次数

Spark 将列值拆分为多行

我的问题是我有一个这样的表:

------------------------
A  B    C
------------------------
a1 b2   c1|c2|c3|c4
Run Code Online (Sandbox Code Playgroud)

c1|c2|c3|c4 是一个由 | 分隔的值。

我的最终结果应该是这样的:

---------
A  B   C
---------
a1 b1  c1
a1 b1  c2
a1 b1  c3
a1 b1  c4
Run Code Online (Sandbox Code Playgroud)

我该怎么做呢?

谢谢

sql scala apache-spark

2
推荐指数
1
解决办法
4798
查看次数

嵌套 json 中的结构化流式传输不同模式

您好,我有一个场景,传入的消息是一个 Json,其标题为表名,数据部分包含表列数据。现在我想将其写入镶木地板到单独的文件夹,例如 /emp/dept。我可以通过根据表名聚合行来在常规流式传输中实现此目的。但在结构化流媒体中我无法分割它。我怎样才能在结构化流媒体中实现这一点。

{"tableName":"employee","data":{"empid":1","empname":"john","dept":"CS"} {"tableName":"employee","data": {"empid":2","empname":"james","dept":"CS"} {"tableName":"dept","data":{"dept":"1","deptname": "CS","desc":"计算机科学系"}

apache-spark spark-streaming apache-spark-sql spark-structured-streaming

2
推荐指数
1
解决办法
1214
查看次数

如何将 Spark 流输出转换为数据帧或存储在表中

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()
Run Code Online (Sandbox Code Playgroud)

我的输出有 50 个不同的值,格式如下

{"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"}
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮助我将这些数据存储在表格形式中

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  
Run Code Online (Sandbox Code Playgroud)

我会非常感激。

scala apache-spark spark-streaming apache-spark-sql

1
推荐指数
1
解决办法
5926
查看次数

在 Spark 中使用数据类型 map&lt;string,bigint&gt; 将数据帧写入 csv

我有一个文件是 file1snappy.parquet。它有一个复杂的数据结构,比如地图,里面的数组。处理后我得到了最终结果。在将结果写入 csv 时,我收到一些错误消息

"Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support map<string,bigint> data type."
Run Code Online (Sandbox Code Playgroud)

我使用过的代码:

val conf=new SparkConf().setAppName("student-example").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")
    def sumaggr=udf((aggr: Map[String, collection.mutable.WrappedArray[Long]]) => if (aggr.keySet.contains("aggr")) aggr("aggr").sum else 0)
datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
    datadf.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")
Run Code Online (Sandbox Code Playgroud)

我尝试转换 datadf.toString() 但我仍然面临同样的问题。如何将该结果写入 CSV。

apache-spark rdd spark-dataframe

1
推荐指数
1
解决办法
5444
查看次数

如何减去/添加一天至今

我正在尝试获取今天的日期并在 SCALA 中查找前 5 天的日期列表。例如,我必须从当前日期中减去 3 天,然后将该结果日期添加到列表中。

我怎样才能做到这一点?

import scala.collection.mutable.ArrayBuffer
val dateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")
var today_date = new  java.util.Date()
var today = dateFormatter.format(today_date) 

var lst_5_days = ArrayBuffer[String]()
for(i <- 1 to 5)
{
  val prev_day= /* method to get`enter code here` date for previous day 
                 (today - i days) */
  lst_5_days +=prev_day
}
Run Code Online (Sandbox Code Playgroud)

scala date

1
推荐指数
1
解决办法
282
查看次数