如何在PySpark中创建一个新列并使用今天的日期填充此列?
这是我试过的:
import datetime
now = datetime.datetime.now()
df = df.withColumn("date", str(now)[:10])
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
AssertionError:col应该是Column
当我添加--conf spark.driver.maxResultSize=2050到我的spark-submit命令时,我收到以下错误.
17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
Run Code Online (Sandbox Code Playgroud)
添加此配置的原因是错误:
py4j.protocol.Py4JJavaError: An …Run Code Online (Sandbox Code Playgroud) 我SparkSession在PySpark中导入如下:
from pyspark.sql import SparkSession
Run Code Online (Sandbox Code Playgroud)
然后我创建SparkSession:
spark = SparkSession.builder.appName("test").getOrCreate()
Run Code Online (Sandbox Code Playgroud)
并尝试访问SparkContext:
spark.SparkContext.broadcast(...)
Run Code Online (Sandbox Code Playgroud)
但是,我收到一个SparkContext不存在的错误.如何访问它以设置broadcast变量?
我调用的PySpark DataFrame(不是pandas)df非常大collect().因此,下面给出的代码效率不高.它使用的是少量数据,但现在却失败了.
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
for p2,score in nb:
total.append(score)
mean = np.mean(total)
std = np.std(total)
Run Code Online (Sandbox Code Playgroud)
有没有办法通过使用或类似获得mean和std作为两个变量pyspark.sql.functions?
from pyspark.sql.functions import mean as mean_, std as std_
Run Code Online (Sandbox Code Playgroud)
withColumn但是,我可以使用这种方法逐行应用计算,并且它不返回单个变量.
更新:
样本内容df:
+----------+------------------+
|product_PK| products|
+----------+------------------+
| 680|[[691,1], [692,5]]|
| 685|[[691,2], [692,2]]|
| 684|[[691,1], [692,3]]|
Run Code Online (Sandbox Code Playgroud)
我应计算的平均值和标准偏差score值,例如值1中[691,1]的分数之一.
我使用Spark 2.2.0.
我使用Spark来处理来自S3的数据集.它工作正常,直到我决定使用通配符,以便从文件夹的子文件夹中读取数据test.
val path = "s3://data/test"
val spark = SparkSession
.builder()
.appName("Test")
.config("spark.sql.warehouse.dir", path)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val myData = spark.read.parquet(path + "/*/")
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
17/11/20 18:54:21错误ApplicationMaster:用户类引发异常:org.apache.spark.sql.AnalysisException:路径不存在:hdfs://ip-111-112-11-65.eu-west -1.compute.internal:8020 /用户/ HDFS/S3 /数据/检验/ 20171120/*;
我使用以下命令执行上面的代码:
spark-submit --deploy-mode cluster --driver-memory 10g
Run Code Online (Sandbox Code Playgroud)
我不明白为什么Spark尝试从HDFS读取而不是从提供的路径读取.同一段代码可以正常使用另一条路径s3://data/test2/mytest.parquet.
我使用Python 3(我也安装了Python 2),我想从短文本中提取国家或城市.例如,text = "I live in Spain"或text = "United States (New York), United Kingdom (London)".
各国答案:
我试图安装,geography但我无法运行pip install geography.我收到此错误:
收集地理位置找不到满足要求地理位置的版本(来自版本:)没有找到地理位置的匹配分布
它看起来geography只适用于Python 2.
我也有geopandas,但我不知道如何使用geopandas从文本中提取所需的信息.
我想将字典列表转换为DataFrame。这是清单:
mylist =
[
{"type_activity_id":1,"type_activity_name":"xxx"},
{"type_activity_id":2,"type_activity_name":"yyy"},
{"type_activity_id":3,"type_activity_name":"zzz"}
]
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
from pyspark.sql.types import StringType
df = spark.createDataFrame(mylist, StringType())
df.show(2,False)
+-----------------------------------------+
| value|
+-----------------------------------------+
|{type_activity_id=1,type_activity_id=xxx}|
|{type_activity_id=2,type_activity_id=yyy}|
|{type_activity_id=3,type_activity_id=zzz}|
+-----------------------------------------+
Run Code Online (Sandbox Code Playgroud)
我假设我应该为每列提供一些映射和类型,但是我不知道该怎么做。
更新:
我也试过这个:
schema = ArrayType(
StructType([StructField("type_activity_id", IntegerType()),
StructField("type_activity_name", StringType())
]))
df = spark.createDataFrame(mylist, StringType())
df = df.withColumn("value", from_json(df.value, schema))
Run Code Online (Sandbox Code Playgroud)
但是然后我得到了null价值:
+-----+
|value|
+-----+
| null|
| null|
+-----+
Run Code Online (Sandbox Code Playgroud) 下面我提供我的代码.我遍历DataFrame prodRows并为每个product_PK我找到一些匹配的product_PKs子列表prodRows.
numRecProducts = 10
var listOfProducts: Map[Long,Array[(Long, Int)]] = Map()
prodRows.foreach{ row : Row =>
val product_PK = row.get(row.fieldIndex("product_PK")).toString.toLong
val gender = row.get(row.fieldIndex("gender_PK")).toString
val selection = prodRows.filter($"gender_PK" === gender || $"gender_PK" === "UNISEX").limit(numRecProducts).select($"product_PK")
var productList: Array[(Long, Int)] = Array()
if (!selection.rdd.isEmpty()) {
productList = selection.rdd.map(x => (x(0).toString.toLong,1)).collect()
}
listOfProducts = listOfProducts + (product_PK -> productList)
}
Run Code Online (Sandbox Code Playgroud)
但是当我执行它时,它会给我以下错误.selection在某些迭代中看起来似乎是空的.但是,我不明白我该如何处理这个错误:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at …Run Code Online (Sandbox Code Playgroud) 我正在 Scala 中创建一个日期。
val dafo = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'")
val tz = TimeZone.getTimeZone("UTC")
dafo.setTimeZone(tz)
val endTime = dafo.format(new Date())
Run Code Online (Sandbox Code Playgroud)
如何设置昨天的日期而不是今天的日期?
我正在编写一个应该返回多个 DataFrame 的函数:
val df1, df2, df3 = getData(spark,df1,df2,df3)
def getData(spark: SparkSession,
path1: String,
path2: String,
path3: String) : DataFrame = {
val epoch = System.currentTimeMillis() / 1000
val df1 = spark.read.parquet(path1)
val df2 = spark.read.parquet(path2)
val df3 = spark.read.parquet(path3)
df1, df2, df3
}
Run Code Online (Sandbox Code Playgroud)
df1, df2, df3但是,我收到无法返回的编译错误。
apache-spark ×7
python ×6
pyspark ×5
scala ×4
amazon-s3 ×1
dataframe ×1
date ×1
geography ×1
nltk ×1
python-3.x ×1