小编use*_*931的帖子

具有复杂条件的Spark SQL窗口函数

这可能是最容易通过示例解释的.假设我有一个用户登录网站的DataFrame,例如:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

我想在此列添加一个列,指示他们何时成为网站上的活跃用户.但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置.假设这段时间是5天.然后从上表派生的所需表将是这样的:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)

因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间.

我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Run Code Online (Sandbox Code Playgroud)

然后,规则填写became_active日期会是这样,如果tmpnull(即,如果它是第一次登录),或者如果login_date - tmp >= 5再 …

sql window-functions apache-spark apache-spark-sql pyspark

22
推荐指数
2
解决办法
2万
查看次数

超过`spark.driver.maxResultSize`而不向驱动程序提供任何数据

我有一个执行大型连接的Spark应用程序

val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
Run Code Online (Sandbox Code Playgroud)

然后将生成的DataFrame汇总到一个可能有13k行的一个.在连接过程中,作业失败,并显示以下错误消息:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
Run Code Online (Sandbox Code Playgroud)

这是在没有设置之前发生的spark.driver.maxResultSize,所以我设置了spark.driver.maxResultSize=2G.然后,我对连接条件稍作修改,错误重新出现.

编辑:在调整集群,我也加倍数据帧呈现在分区数.coalesce(256).coalesce(512),所以我不能确定这是因为,不是.

我的问题是,既然我没有向司机收集任何东西,为什么要spark.driver.maxResultSize在这里重要?驱动程序的内存是否用于我不知道的连接中的某些内容?

memory scala apache-spark apache-spark-sql

17
推荐指数
1
解决办法
2723
查看次数

处理Spark中的大型gzip压缩文件

我有一个来自s3的大型(大约85 GB压缩)gzip压缩文件,我正在尝试使用AWS EMR上的Spark处理(现在有一个m4.xlarge主实例和两个m4.10xlarge核心实例,每个实例都有一个100 GB的EBS卷) .我知道gzip是一种不可拆分的文件格式, 看到 建议应该重新分区压缩文件,因为Spark最初给出了一个带有一个分区的RDD.但是,做完之后

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()
Run Code Online (Sandbox Code Playgroud)

并且看一下Spark应用程序UI,我仍然看到只有一个活动执行程序(其他14个已经死了)有一个任务,并且作业永远不会完成(或者至少我没有等待足够长的时间).

  • 这里发生了什么?有人可以帮我理解Spark在这个例子中是如何工作的吗?
  • 我应该使用不同的群集配置吗?
  • 不幸的是,我无法控制压缩模式,但有没有其他方法来处理这样的文件?

gzip amazon-emr apache-spark

9
推荐指数
2
解决办法
8785
查看次数

在 bash 脚本中包含源代码时,如何修复或屏蔽 shellcheck 发现的 1091 错误?

Shellsheck 是一个 shell 脚本的静态分析工具,可以在某些 Linux 系统上本地安装,也可以不安装在线使用来检查 bash 脚本是否存在一些错误。

测试环境:

  • Linux Mint(Ubuntu 版本)
  • 给出了一个工作 bash 脚本,它回显“我是来自主文件的回显”和一个源文件,它回显“我是来自源文件的回显”。
  • Booth 文件位于同一文件夹中。
  • 通过本地安装版本使用 shellcheck 0.7.1-1 进行测试。

主目录

#!/bin/bash

source ./sourcefile.sh

echo "Output from main.sh"
echo
echo

fkt_output_from_sourcefile_sh
echo
echo

echo "For closing window, press Enter or Ctl + C"; read -r
Run Code Online (Sandbox Code Playgroud)

源文件.sh

#!/bin/bash

fkt_output_from_sourcefile_sh() {
        echo "Output from sourcefile.sh"
    }
Run Code Online (Sandbox Code Playgroud)

我是如何在终端上运行它的:

shellcheck -x main.sh
Run Code Online (Sandbox Code Playgroud)

终端输出(看起来工作正常):

Output from main.sh


Output from sourcefile.sh


For closing window, press Enter or Ctl + C
Run Code Online (Sandbox Code Playgroud)

通过 shellcheck …

bash shellcheck

7
推荐指数
1
解决办法
5750
查看次数

为什么 S3 对象的 ETag 在副本下会发生变化?

我正在尝试使用 boto3 在不同帐户的存储桶中的前缀之间执行 S3 同步。我的尝试继续列出帐户 A 中的源存储桶/前缀中的对象,列出帐户 B 中的目标存储桶/前缀中的对象,然后复制前者中 ETag 与帐户 B 中对象的 ETag 不匹配的对象。后者。这似乎是正确的做法。

但是,似乎即使复制操作成功,每次执行复制时目标对象的 ETag 都不一样。具体来说,

>>> # Here is the source object: {'Key': 'blah/blah/file_20210328_232250.parquet', 'LastModified': datetime.datetime(2021, 3, 28, 23, 38, 2, tzinfo=tzutc()), 'ETag': '"ba230f7a358cf1bee6c98250089da435"', 'Size': 52319, 'StorageClass': 'STANDARD'}
>>> client.copy_object(
CopySource={"Bucket": "source-bucket-in-acct-a", "Key": "blah/blah/file_20210328_232250.parquet"),
Bucket="dest-bucket-in-acct-b",
Key="blah/blah/file_20210328_232250.parquet"
)
... 'CopyObjectResult': {'ETag': '"84f11f744cf996e16a3af0d6d2fbee07"', 'LastModified': datetime.datetime(2021, 4, 20, 2, 23, 40, tzinfo=tzutc())}}
Run Code Online (Sandbox Code Playgroud)

请注意,ETag 已更改。如果我再次运行该副本,它将再次具有不同的 ETag。我已经尝试了复制请求的所有附加参数(MetadataDirective="COPY"等)。我可能缺少保留 ETag 的东西,但我的理解是 ETag 是从对象的数据派生的,而不是其元数据。

现在, AWS 文档中说ETag对于成功的非多部分复制操作是相同的,事实确实如此,但情况似乎并非如此。它不是多部分副本,我已经检查了实际数据;它们是相同的。因此,我的问题是:

如果不是因为复制不成功,对象的 ETag 怎么会改变呢?

amazon-s3 amazon-web-services boto3

6
推荐指数
1
解决办法
3587
查看次数

在 Scala 中测试数组是否为空

问题不在于如何测试数组是否为空(arr.length == 0这很好)。相反,我的问题是,为什么

scala> Array().isEmpty
res1: Boolean = true
Run Code Online (Sandbox Code Playgroud)

工作和

scala> val x = Array[String]()
x: Array[String] = Array()
scala> x.isEmpty
res2: Boolean = true
Run Code Online (Sandbox Code Playgroud)

工作,但是

scala> val y = Array()
y: Array[Nothing] = Array()

scala> y.isEmpty
<console>:13: error: value isEmpty is not a member of Array[Nothing]
       y.isEmpty
         ^
Run Code Online (Sandbox Code Playgroud)

才不是?

scala scala-collections

5
推荐指数
1
解决办法
8264
查看次数

为记录类型定义幺半群实例

假设我有一个类型

data Options = Options
  { _optionOne :: Maybe Integer
  , _optionTwo :: Maybe Integer
  , _optionThree :: Maybe String
  } deriving Show
Run Code Online (Sandbox Code Playgroud)

有更多的领域。我想Monoid为这种类型定义一个实例,其mempty值是 an Optionswith all fields Nothing。有没有比这更简洁的写法

instance Monoid Options where
  mempty = Options Nothing Nothing Nothing
  mappend = undefined
Run Code Online (Sandbox Code Playgroud)

Nothing当我Options有更多字段时,这将避免需要写一堆s ?

haskell typeclass monoids

5
推荐指数
1
解决办法
602
查看次数

Haskell如何立即计算这个巨大的数字?

我开始学习Haskell,当我学习一门新语言时,我喜欢做的一件事就是将Project Euler问题作为我主要参考资料的补充.

我找到了第二个问题的解决方案,即找到偶数斐波纳契数不到四百万的总和:

fibs = 0 : 1 : zipWith (+) fibs (tail fibs)
f :: Integer -> Integer
f n =
  let evenFib = filter (\n -> n `mod` 2 == 0) fibs
  in sum (takeWhile (<n) evenFib)
Run Code Online (Sandbox Code Playgroud)

这很好用; f 4000000返回正确的答案.它立即这样做.好奇,我开始输入越来越大的数字......

Prelude> f 40000000
19544084
Prelude> f 400000000000
478361013020
Prelude> f 40000000000000000000000000000000
13049874051046942401006156573274
Prelude> f 2370498572349582734598273495872349587234958723948752394857
2805750129675962215536656398462489370528480907433875715844
Run Code Online (Sandbox Code Playgroud)

立即返回这些值中的每一个.我无法保证最后两个答案的真实性,因为我在其他语言中的实现不适用于这么大的数字.

所以,我的问题是,Haskell在这做什么?它是如何即时返回这些值的(无论它们实际上是否正确)?此外,这些答案确实是正确的,还是Haskell只是制作东西?

haskell

4
推荐指数
1
解决办法
247
查看次数

在Haskell的Aeson中省略Nothing/null字段

我有一个类型

{-# LANGUAGE DeriveGeneric   #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE MultiWayIf      #-}

import GHC.Generics
import Data.Aeson.TH
import Data.Aeson.Types

data MyJSONObject = MyJSONObject
  { name    :: String
  , ptype   :: Maybe String
  , pid     :: Maybe String
  , subject :: Maybe String
  , message :: Maybe String
  } deriving (Show, Generic)
Run Code Online (Sandbox Code Playgroud)

还有更多的Maybe String领域.我FromJSONToJSON这个TemplateHaskell功能提供

$(deriveJSON defaultOptions
  {
    omitNothingFields  = True
  , fieldLabelModifier = \f -> if
      | f == "ptype" -> "type" -- …
Run Code Online (Sandbox Code Playgroud)

haskell template-haskell aeson

3
推荐指数
1
解决办法
674
查看次数

有没有办法用Haskell的管道模拟三通?

我试图从Haskell程序中生成一个进程,我想将其标准错误流打印到屏幕,同时也将相同的流写入文件,就像tee命令实现的那样.

我可以打印标准错误流:

import Data.Conduit ((.|), runConduit)
import qualified Data.Conduit.List as CL
import Data.Conduit.Process

main :: IO ()
main = do
  (ClosedStream, ClosedStream, err, sph) <- streamingProcess (shell myCommand)

  runConduit $ err .| CL.mapM_ print
Run Code Online (Sandbox Code Playgroud)

我可以将流指向文件:

import System.IO (withFile, IOMode (..))
import Data.Conduit.Process

main :: IO ()
main = do
  let logFile = "myCommand.log"
  withFile logFile WriteMode $ \h -> do
    (ClosedStream, ClosedStream, UseProvidedHandle, sph) <-
      streamingProcess (shell myCommand) {std_err = UseHandle h}
Run Code Online (Sandbox Code Playgroud)

我怎样才能同时做到这两件事?

haskell conduit

3
推荐指数
1
解决办法
145
查看次数

Sqoop导入将TINYINT转换为BOOLEAN

我试图使用Sqoop将一个NFL播放结果的MySQL表导入HDFS.我发出以下命令来实现这个目的:

sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/nfl \
--username <username> -P \
--table play
Run Code Online (Sandbox Code Playgroud)

不幸的是,有一些类型的列,TINYINT在导入时会被转换为布尔值.例如,游戏发生在游戏的四分之一处有一个"四分之一"列.如果游戏发生在第一季度,则此列中的值将转换为"真",否则将转换为"假".

事实上,我做了一个sqoop import-all-tables,导入我拥有的整个NFL数据库,它的行为就像这样统一.

有没有解决的办法,或者一些论据importimport-all-tables防止这种情况的发生?

hadoop hdfs sqoop

2
推荐指数
1
解决办法
1043
查看次数

为什么`catch`不能捕获这个异常?

我有一个Servant应用程序和一个端点,该端点在数据库中创建一条记录,然后尝试在S3位置之间复制文件。如果复制失败,我想回滚事务。我有这个接线员

{-# LANGUAGE TemplateHaskell #-}    

import Control.Monad.Catch
import Control.Monad.Except
import Control.Monad.Logger

(<??)
  :: (MonadError e m, MonadCatch m, MonadLogger m)
  => e
  -> m a
  -> m a
(<??) err a = a `catchAll` (\e -> $(logErrorSH) e >> throwError err)
infixr 0 <??
Run Code Online (Sandbox Code Playgroud)

捕获所有异常,记录异常的性质,然后引发(在我的情况下,因为我的App类型具有的实例MonadError ServantErr)a ServantErr

我的处理程序是这样的:

{-# LANGUAGE ScopedTypeVariables #-}

import           Control.Monad
import           Control.Monad.Catch
import           Control.Monad.IO.Class
import qualified Network.AWS as AWS
import           Servant

import App.Types
import App.Db


copy :: Copy -> App Text …
Run Code Online (Sandbox Code Playgroud)

haskell

1
推荐指数
1
解决办法
136
查看次数