我有一张这样的桌子:
|Quality|Schedule|Cost Control|
-------------------------------
|7 | 8.5 |10 |
|NULL | 9 |NULL |
Run Code Online (Sandbox Code Playgroud)
并且我需要计算同一张表中每一行的平均值,因此它看起来像这样:
|Quality|Schedule|Cost Control|AVG|
----------------------------------
|7 | 8.5 |10 |8.5|
|NULL | 9 |NULL |9 |
Run Code Online (Sandbox Code Playgroud)
我使用以下代码完成了此操作:
SELECT r.Quality, r.Schedule, r.CostControl,
((coalesce(r.quality,0)+
coalesce(r.schedule,0)+
coalesce(r.CostControl,0)/3) as Average
FROM dbo.Rating r
Run Code Online (Sandbox Code Playgroud)
给出下表:
|Quality|Schedule|Cost Control|AVG|
----------------------------------
|7 | 8.5 |10 |8.5|
|NULL | 9 |NULL |3 |
Run Code Online (Sandbox Code Playgroud)
我知道问题在于除数在我的select语句中进行了硬编码,但是我不知道如何使它可变。我尝试使用case语句选择一个附加列:
select Count(case when(r.quality) > 0 then 1 else 0 end +
case when (r.Schedule) > 0 then 1 else 0 end + …
Run Code Online (Sandbox Code Playgroud) 我正在聚合来自 S3 的数据并使用 Glue 将其写入 Postgres。我的问题是我需要在写入之前截断要写入的表。我已经找到了该connection_options: {"preactions":"truncate table <table_name>"}
功能,但似乎只适用于 Redshift。有没有简单的方法,使用粘合连接,只运行一个简单的截断查询?我找到了建议使用自定义 jar 或编写自定义 java 函数的答案,但我真的希望有类似的东西。以下是相关代码行:
dfFinal = df4.drop_duplicates()
datasource2 = DynamicFrame.fromDF(dfFinal, glueContext, "scans")
output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"preactions":"truncate table scans_staging;","database" : "app", "dbtable" : "scans_staging"})
Run Code Online (Sandbox Code Playgroud) 我有一个像这样的数据框(df_full):
|cust_id|address |store_id|email |sales_channel|category|
-------------------------------------------------------------------
|1234567|123 Main St|10SjtT |idk@gmail.com|ecom |direct |
|4567345|345 Main St|10SjtT |101@gmail.com|instore |direct |
|1569457|876 Main St|51FstT |404@gmail.com|ecom |direct |
Run Code Online (Sandbox Code Playgroud)
我想将最后 4 个字段组合成一个元数据字段,它是一个像这样的字典:
|cust_id|address |metadata |
-------------------------------------------------------------------------------------------------------------------
|1234567|123 Main St|{'store_id':'10SjtT', 'email':'idk@gmail.com','sales_channel':'ecom', 'category':'direct'} |
|4567345|345 Main St|{'store_id':'10SjtT', 'email':'101@gmail.com','sales_channel':'instore', 'category':'direct'}|
|1569457|876 Main St|{'store_id':'51FstT', 'email':'404@gmail.com','sales_channel':'ecom', 'category':'direct'} |
Run Code Online (Sandbox Code Playgroud)
那可能吗?我在堆栈溢出方面看到了一些解决方案,但没有一个解决将超过 2 个字段组合到字典字段中的问题。
不久前我为 python 提出了这个问题,但现在我需要在 PySpark 中做同样的事情。
我有一个像这样的数据框(df):
|cust_id|address |store_id|email |sales_channel|category|
-------------------------------------------------------------------
|1234567|123 Main St|10SjtT |idk@gmail.com|ecom |direct |
|4567345|345 Main St|10SjtT |101@gmail.com|instore |direct |
|1569457|876 Main St|51FstT |404@gmail.com|ecom |direct |
Run Code Online (Sandbox Code Playgroud)
我想将最后 4 个字段组合成一个元数据字段,它是一个 json 格式,如下所示:
|cust_id|address |metadata |
-------------------------------------------------------------------------------------------------------------------
|1234567|123 Main St|{'store_id':'10SjtT', 'email':'idk@gmail.com','sales_channel':'ecom', 'category':'direct'} |
|4567345|345 Main St|{'store_id':'10SjtT', 'email':'101@gmail.com','sales_channel':'instore', 'category':'direct'}|
|1569457|876 Main St|{'store_id':'51FstT', 'email':'404@gmail.com','sales_channel':'ecom', 'category':'direct'} |
Run Code Online (Sandbox Code Playgroud)
这是我用来在 python 中执行此操作的代码:
cols = [
'store_id',
'store_category',
'sales_channel',
'email'
]
df1 = df.copy()
df1['metadata'] = df1[cols].to_dict(orient='records')
df1 = df1.drop(columns=cols)
Run Code Online (Sandbox Code Playgroud)
但我想将其转换为 PySpark 代码以使用 spark 数据框;我不想在 Spark …
我正在尝试启动一个包含 Postgres 驱动程序 JAR 文件的 EMR 集群,以便我可以从 Postgres 加载数据并使用 PySpark 进行分析。我有我想要包含的 JAR,存储在 S3 中。我尝试过以下操作:
1 - 输入以下配置:
[
{
"Classification": "presto-connector-postgresql",
"Properties": {
"connection-url": "jdbc:postgresql://example.net:5432/database",
"connection-user": "MYUSER",
"connection-password": "MYPASS"
},
"Configurations": []
}
]
Run Code Online (Sandbox Code Playgroud)
2 - 添加 JAR 作为自定义步骤(从 S3 选择 JAR)
3 - 添加 JAR 作为自定义引导操作(从 S3 选择 JAR)
这些都不起作用,我无法弄清楚如何在 Jupyter 中使用步骤 1 中的连接器,并且当我启动集群时,自定义步骤/引导操作都会失败。如何启动安装了 Postgres 驱动程序的 EMR 集群,以便可以在 Jupyter 中查询数据?
编辑:
我使用以下引导脚本将 JAR 复制到我的主/工作节点:
#!/bin/bash
aws s3 cp s3://BUCKETNAME/postgresql-42.2.8.jar /mnt1/myfolder
Run Code Online (Sandbox Code Playgroud)
但仍然出现以下错误:
An error was encountered:
An error occurred while …
Run Code Online (Sandbox Code Playgroud) 我有以下示例数据框:
+------------------+-----------+
|order_completed_at|static_date|
+------------------+-----------+
|6/16/2021 21:29 |2021-10-10 |
|6/7/2021 9:29 |2021-10-10 |
|6/12/2021 15:35 |2021-10-10 |
|6/18/2021 22:25 |2021-10-10 |
|6/16/2021 5:25 |2021-10-10 |
+------------------+-----------+
Run Code Online (Sandbox Code Playgroud)
其中两个字段都是字符串类型。我需要将它们转换为时间戳,我可以使用以下代码来完成:
from pyspark.sql import functions as sql_functions
order_dates = order_dates.withColumn("order_completed_at_test", sql_functions.when() sql_functions.unix_timestamp(
sql_functions.col('order_completed_at'), "MM/dd/yyyy").cast("timestamp"))
order_dates = order_dates.withColumn("static_date_test", sql_functions.to_timestamp("static_date"))
Run Code Online (Sandbox Code Playgroud)
然而,在order_completed_at
列中,可以有多种格式的混合,例如它可以是MM/dd/yyyy
或yyyy-MM-dd
是否可以编写一个可以解释两种日期时间格式的表达式?
编辑:
我将结束这个问题,因为正如两个答案都指出的那样,处理动态日期格式的这条道路非常滑坡。我将要求客户更改源数据
我使用以下 docker-compose.yaml 在端口 8080 上本地运行气流:
version: '3.7'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
logging:
options:
max-size: 10m
max-file: "3"
webserver:
image: puckel/docker-airflow:1.10.9
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=y
- EXECUTOR=Local
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/usr/local/airflow/dags
# Add this to have third party packages
- ./requirements.txt:/requirements.txt
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
Run Code Online (Sandbox Code Playgroud)
但是我需要端口 8080 用于另一个进程。我尝试更新两者 …
我正在尝试编写一个 UDF 来计算具有条件格式的单元格的数量。我写了以下子,就像一个魅力:
Sub SumCountByConditionalFormat()
Dim cellrngi As Range
Dim cntresi As Long
cntresi = 0
Set cellrngi = Sheets("Sheet3").Range("I2:I81")
For Each i In cellrngi
If i.DisplayFormat.Interior.Color <> 16777215 Then
cntresi = cntresi + 1
End If
Next i
end sub
Run Code Online (Sandbox Code Playgroud)
我尝试使用以下代码将其转换为 UDF:
Function CountCellsByColor1(rData As Range) As Long
Dim cntRes As Long
Application.Volatile
cntRes = 0
For Each cell In rData
If cell.DisplayFormat.Interior.Color <> 16777215 Then
cntRes = cntRes + 1
End If
Next cell
CountCellsByColor1 = cntRes
End …
Run Code Online (Sandbox Code Playgroud) 我在 S3 中有一个 JSON 文件,采用以下结构的格式:
type StockInfo []struct {
Ticker string `json:"ticker"`
BoughtPrice string `json:"boughtPrice"`
NumberOfShares string `json:"numberOfShares"`
}
Run Code Online (Sandbox Code Playgroud)
我想将其读入 S3 中的结构值。在 python 中,代码看起来像这样:
import boto3
import json
s3 = boto3.client('s3', 'us-east-1')
obj = s3.get_object(Bucket=os.environ["BucketName"], Key=os.environ["Key"])
fileContents = obj['Body'].read().decode('utf-8')
json_content = json.loads(fileContents)
Run Code Online (Sandbox Code Playgroud)
然而我有点困惑如何在 Go 中实现这一点。我已经走到这一步了:
package main
import (
"archive/tar"
"bytes"
"fmt"
"log"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/joho/godotenv"
)
type StockInfo []struct {
Ticker string `json:"ticker"`
BoughtPrice string `json:"boughtPrice"`
NumberOfShares string `json:"numberOfShares"`
}
func init() {
// loads values …
Run Code Online (Sandbox Code Playgroud) python ×4
pyspark ×3
apache-spark ×2
postgresql ×2
airflow ×1
amazon-emr ×1
amazon-s3 ×1
aws-glue ×1
dataframe ×1
docker ×1
excel ×1
go ×1
json ×1
pandas ×1
sql ×1
sql-server ×1
vba ×1