我有一个数据集,其中 1 列是long代表毫秒的 a 。yyyy-MM-dd HH:mm:ss我想获取该数字在UTC中表示的时间戳 ( ) 。基本上我想要与https://currentmillis.com/相同的行为
我的问题是,有没有办法让 Spark 代码将毫秒长字段转换为 UTC 时间戳?我能够使用本机 Spark 代码得到的只是将那么长的时间转换为我的本地时间 (EST):
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import types as T
from pyspark.sql import functions as F
sc = SparkContext()
spark = SQLContext(sc)
df = spark.read.json(sc.parallelize([{'millis':1582749601000}]))
df.withColumn('as_date', F.from_unixtime((F.col('millis')/1000))).show()
+-------------+-------------------+
| millis| as_date|
+-------------+-------------------+
|1582749601000|2020-02-26 15:40:01|
+-------------+-------------------+
Run Code Online (Sandbox Code Playgroud)
我已经能够通过强制整个 Spark 会话的时区来转换为 UTC。不过,我想避免这种情况,因为必须为该作业中的特定用例更改整个 Spark 会话时区感觉是错误的。
spark.sparkSession.builder.master('local[1]').config("spark.sql.session.timeZone", "UTC").getOrCreate()
Run Code Online (Sandbox Code Playgroud)
我还想避免自定义函数,因为我希望能够在 Scala 和 Python 中部署它,而无需在每个函数中编写特定于语言的代码。
我有一个 Jupyter 笔记本,需要从命令行运行。为此,我有以下命令:
jupyter nbconvert --execute my_jupyter_notebook.ipynb --to python
Run Code Online (Sandbox Code Playgroud)
This command creates a python script and then executes it. However, I'm using the logging library in Python to log certain events. When it executes the script from the command above, nothing can be seen on the terminal.
However, when I execute manually the converted jupyter, like below, I can see all the logs on my terminal:
python3 my_jupyter_notebook.py
Run Code Online (Sandbox Code Playgroud)
I've tried adding extra arguments like --debug and --stdout but those just …
我有一段代码可以将文件放入 FTP 服务器。看起来像这样:
"C:\Program Files (x86)\WinSCP\WinSCP.com" /command "open user@myFTPServer:MyPort/MyPath/ -privatekey=myprivatekey.ppk" "put myfile.txt" "exit"
Run Code Online (Sandbox Code Playgroud)
这在 Windows 命令提示符下运行良好,但是在 PowerShell 中运行相同的东西时它不起作用并返回以下错误:
“您必须在'/'运算符之后提供一个值表达式”
我已经尝试了我的代码的几种组合,但都没有奏效:
1)
"C:\Program Files (x86)\WinSCP\WinSCP.com" -command "open user@myFTPServer:MyPort/MyPath/ -privatekey=myprivatekey.ppk" "put myfile.txt" "exit"
Run Code Online (Sandbox Code Playgroud)
2)
"C:\Program Files (x86)\WinSCP\WinSCP.com" -command ""open user@myFTPServer:MyPort/MyPath/ -privatekey=myprivatekey.ppk" "put myfile.txt" "exit""
Run Code Online (Sandbox Code Playgroud) 我有一个 .csv 文件,但无法在 Octave 上读取它。在 RI 上,只需使用以下命令,一切都会正常读取:
myData <- read.csv("myData.csv", stringsAsFactors = FALSE)
Run Code Online (Sandbox Code Playgroud)
但是,当我转到 Octave 时,它无法使用以下命令正确执行:
myData = csvread('myData.csv',1,0);
Run Code Online (Sandbox Code Playgroud)
当我用记事本打开该文件时,数据如下所示。请注意,最后一个列名称(即 Column3)与第一个值(即 Value1)之间没有逗号分隔,并且第一行的最后一个值(即 Value3)和第二行的第一个值(即即值4)
Column1,Column2,Column3Value1,Value2,Value3Value4,Value5,Value6
Run Code Online (Sandbox Code Playgroud)
Column1 用于日期值(格式为 yyyy-mm-dd hh:mm:ss),我不知道这是否与问题有关。
我有一个计算时间太长的脚本,我正在尝试并行化其执行。
该脚本基本上循环遍历数据帧的每一行并执行一些计算,如下所示:
my.df = data.frame(id=1:9,value=11:19)
sumPrevious <- function(df,df.id){
sum(df[df$id<=df.id,"value"])
}
for(i in 1:nrow(my.df)){
print(sumPrevious(my.df,my.df[i,"id"]))
}
Run Code Online (Sandbox Code Playgroud)
我开始学习在 R 中并行化代码,这就是为什么我首先想了解如何使用类似 apply 的函数(例如 sapply、lapply、mapply)来做到这一点。
我尝试了多种方法,但到目前为止没有任何效果:
mapply(sumPrevious,my.df,my.df$id) # Error in df$id : $ operator is invalid for atomic vectors
Run Code Online (Sandbox Code Playgroud) 我最近安装了最新版本的RStudio,现在每次保存我正在处理的".R"文件时,RStudio都会在保存时执行代码.如何禁用此功能?
我正在本地运行的 Kubernetes(Docker 桌面)中提交 Spark 作业。我能够提交作业并在屏幕上看到它们的最终输出。
但是,即使它们已完成,驱动程序和执行程序 Pod 仍处于 RUNNING 状态。
用于将 Spark 作业提交到 kubernetes 的基本映像是 Spark 附带的映像,如文档中所述。
这是我的spark-submit命令的样子:
~/spark-2.4.3-bin-hadoop2.7/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
--name my-spark-job \
--conf spark.kubernetes.container.image=my-spark-job \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.submission.waitAppCompletion=false \
local:///opt/spark/work-dir/my-spark-job.py
Run Code Online (Sandbox Code Playgroud)
这就是kubectl get pods返回的内容:
NAME READY STATUS RESTARTS AGE
my-spark-job-1568669908677-driver 1/1 Running 0 11m
my-spark-job-1568669908677-exec-1 1/1 Running 0 10m
my-spark-job-1568669908677-exec-2 1/1 Running 0 10m
Run Code Online (Sandbox Code Playgroud) 我正在尝试设置 Spark 作业来使用来自 Kafka 的数据。Kafka 代理已设置 SSL,但我无法正确构建/验证消费者。
spark-shell命令:
spark-2.3.4-bin-hadoop2.7/bin/spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4
--files "spark-kafka.jaas"
--driver-java-options "-Djava.security.auth.login.config=./spark-kafka.jaas"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./spark-kafka.jaas"
Run Code Online (Sandbox Code Playgroud)
spark-kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="myusername"
password="mypwd"
};
Run Code Online (Sandbox Code Playgroud)
外壳命令:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1, host2:port2")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "./truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-pwd")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("subscribe", "mytopic")
.option("startingOffsets", "earliest")
.load()
df.show()
Run Code Online (Sandbox Code Playgroud)
错误:
2019-09-23 16:32:19 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at …Run Code Online (Sandbox Code Playgroud) 我有一个需要根据某些条件进行过滤的项目列表。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几十万条记录)。
基本上,我需要做的是:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
Run Code Online (Sandbox Code Playgroud)
通过 Dask,我实现了以下目标:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False …Run Code Online (Sandbox Code Playgroud) 我在Azure上设置了一个Linux VM - Ubuntu Server 14.04 LTS.
我的目标是能够从Windows 10进行远程桌面连接.
我是Linux的新手,这就是我一直关注本教程的原因.一切似乎工作正常,直到我需要创建一个"独立端点".Azure的门户网站中的界面已更改.我所做的是创建一个端点,如下图所示,但当我尝试单击"连接"时,该选项仍然被禁用.
apache-spark ×3
python ×2
r ×2
apache-kafka ×1
azure ×1
bag ×1
csv ×1
dask ×1
dask-delayed ×1
dictionary ×1
ftp ×1
jaas ×1
jupyter ×1
jupyter-lab ×1
kubernetes ×1
lapply ×1
linux ×1
mapply ×1
matlab ×1
nbconvert ×1
octave ×1
powershell ×1
pyspark ×1
rstudio ×1
sapply ×1
sftp ×1
ssl ×1
ubuntu ×1
windows ×1
winscp ×1