小编Mar*_*kus的帖子

AssertionError:col应该是Column

如何在PySpark中创建一个新列并使用今天的日期填充此列?

这是我试过的:

import datetime
now = datetime.datetime.now()
df = df.withColumn("date", str(now)[:10])
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

AssertionError:col应该是Column

python apache-spark apache-spark-sql pyspark

11
推荐指数
1
解决办法
2万
查看次数

16个任务(1048.5 MB)的序列化结果总大小大于spark.driver.maxResultSize(1024.0 MB)

当我添加--conf spark.driver.maxResultSize=2050到我的spark-submit命令时,我收到以下错误.

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
Run Code Online (Sandbox Code Playgroud)

添加此配置的原因是错误:

py4j.protocol.Py4JJavaError: An …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

10
推荐指数
2
解决办法
1万
查看次数

如何从SparkSession实例访问SparkContext?

SparkSession在PySpark中导入如下:

from pyspark.sql import SparkSession
Run Code Online (Sandbox Code Playgroud)

然后我创建SparkSession:

spark = SparkSession.builder.appName("test").getOrCreate()
Run Code Online (Sandbox Code Playgroud)

并尝试访问SparkContext:

spark.SparkContext.broadcast(...)
Run Code Online (Sandbox Code Playgroud)

但是,我收到一个SparkContext不存在的错误.如何访问它以设置broadcast变量?

python apache-spark pyspark

9
推荐指数
2
解决办法
9836
查看次数

如何计算PySpark DataFrame的平均值和标准差?

我调用的PySpark DataFrame(不是pandas)df非常大collect().因此,下面给出的代码效率不高.它使用的是少量数据,但现在却失败了.

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)
Run Code Online (Sandbox Code Playgroud)

有没有办法通过使用或类似获得meanstd作为两个变量pyspark.sql.functions

from pyspark.sql.functions import mean as mean_, std as std_
Run Code Online (Sandbox Code Playgroud)

withColumn但是,我可以使用这种方法逐行应用计算,并且它不返回单个变量.

更新:

样本内容df:

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|
Run Code Online (Sandbox Code Playgroud)

我应计算的平均值和标准偏差score值,例如值1[691,1]的分数之一.

python apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
3万
查看次数

如何解决org.apache.spark.sql.AnalysisException:Path不存在

我使用Spark 2.2.0.

我使用Spark来处理来自S3的数据集.它工作正常,直到我决定使用通配符,以便从文件夹的子文件夹中读取数据test.

val path = "s3://data/test"
val spark = SparkSession
  .builder()
  .appName("Test")
  .config("spark.sql.warehouse.dir", path)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._    
val myData = spark.read.parquet(path + "/*/")
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

17/11/20 18:54:21错误ApplicationMaster:用户类引发异常:org.apache.spark.sql.AnalysisException:路径不存在:hdfs://ip-111-112-11-65.eu-west -1.compute.internal:8020 /用户/ HDFS/S3 /数据/检验/ 20171120/*;

我使用以下命令执行上面的代码:

spark-submit --deploy-mode cluster --driver-memory 10g
Run Code Online (Sandbox Code Playgroud)

我不明白为什么Spark尝试从HDFS读取而不是从提供的路径读取.同一段代码可以正常使用另一条路径s3://data/test2/mytest.parquet.

scala amazon-s3 apache-spark

6
推荐指数
0
解决办法
3082
查看次数

如何从文本中提取国家?

我使用Python 3(我也安装了Python 2),我想从短文本中提取国家或城市.例如,text = "I live in Spain"text = "United States (New York), United Kingdom (London)".

各国答案:

  1. 西班牙
  2. [美国,英国]

我试图安装,geography但我无法运行pip install geography.我收到此错误:

收集地理位置找不到满足要求地理位置的版本(来自版本:)没有找到地理位置的匹配分布

它看起来geography只适用于Python 2.

我也有geopandas,但我不知道如何使用geopandas从文本中提取所需的信息.

python geography nltk python-3.x

6
推荐指数
1
解决办法
3323
查看次数

如何将字典列表转换为Pyspark DataFrame

我想将字典列表转换为DataFrame。这是清单:

mylist = 
[
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

from pyspark.sql.types import StringType

df = spark.createDataFrame(mylist, StringType())

df.show(2,False)

+-----------------------------------------+
|                                    value|
+-----------------------------------------+
|{type_activity_id=1,type_activity_id=xxx}|
|{type_activity_id=2,type_activity_id=yyy}|
|{type_activity_id=3,type_activity_id=zzz}|
+-----------------------------------------+
Run Code Online (Sandbox Code Playgroud)

我假设我应该为每列提供一些映射和类型,但是我不知道该怎么做。

更新:

我也试过这个:

schema = ArrayType(
    StructType([StructField("type_activity_id", IntegerType()),
                StructField("type_activity_name", StringType())
                ]))
df = spark.createDataFrame(mylist, StringType())
df = df.withColumn("value", from_json(df.value, schema))
Run Code Online (Sandbox Code Playgroud)

但是然后我得到了null价值:

+-----+
|value|
+-----+
| null|
| null|
+-----+
Run Code Online (Sandbox Code Playgroud)

python apache-spark-sql pyspark

6
推荐指数
3
解决办法
7762
查看次数

引起:org.apache.spark.sql.Dataset中的java.lang.NullPointerException

下面我提供我的代码.我遍历DataFrame prodRows并为每个product_PK我找到一些匹配的product_PKs子列表prodRows.

  numRecProducts = 10
  var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
  prodRows.foreach{ row : Row =>
      val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
      val gender = row.get(row.fieldIndex("gender_PK")).toString
      val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
      var productList: Array[(Long, Int)] = Array()
      if (!selection.rdd.isEmpty()) {
        productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
      }
    listOfProducts = listOfProducts + (product_PK -> productList)
  }
Run Code Online (Sandbox Code Playgroud)

但是当我执行它时,它会给我以下错误.selection在某些迭代中看起来似乎是空的.但是,我不明白我该如何处理这个错误:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
    at …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

5
推荐指数
1
解决办法
7861
查看次数

如何使用Scala设置昨天的日期?

我正在 Scala 中创建一个日期。

  val dafo = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'")
  val tz = TimeZone.getTimeZone("UTC")
  dafo.setTimeZone(tz)
  val endTime = dafo.format(new Date())
Run Code Online (Sandbox Code Playgroud)

如何设置昨天的日期而不是今天的日期?

scala date simpledateformat

4
推荐指数
1
解决办法
6411
查看次数

Scala中的函数如何返回多个DataFrame?

我正在编写一个应该返回多个 DataFrame 的函数:

val df1, df2, df3 = getData(spark,df1,df2,df3)

def getData(spark: SparkSession, 
            path1: String, 
            path2: String,
            path3: String) : DataFrame = {

  val epoch = System.currentTimeMillis() / 1000

  val df1 = spark.read.parquet(path1)
  val df2 = spark.read.parquet(path2)
  val df3 = spark.read.parquet(path3)

  df1, df2, df3
}
Run Code Online (Sandbox Code Playgroud)

df1, df2, df3但是,我收到无法返回的编译错误。

scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
5119
查看次数