这可能是最容易通过示例解释的.假设我有一个用户登录网站的DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
我想在此列添加一个列,指示他们何时成为网站上的活跃用户.但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置.假设这段时间是5天.然后从上表派生的所需表将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)
因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间.
我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Run Code Online (Sandbox Code Playgroud)
然后,规则填写became_active日期会是这样,如果tmp是null(即,如果它是第一次登录),或者如果login_date - tmp >= 5再 …
我有一个执行大型连接的Spark应用程序
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
Run Code Online (Sandbox Code Playgroud)
然后将生成的DataFrame汇总到一个可能有13k行的一个.在连接过程中,作业失败,并显示以下错误消息:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
Run Code Online (Sandbox Code Playgroud)
这是在没有设置之前发生的spark.driver.maxResultSize,所以我设置了spark.driver.maxResultSize=2G.然后,我对连接条件稍作修改,错误重新出现.
编辑:在调整集群,我也加倍数据帧呈现在分区数.coalesce(256)的.coalesce(512),所以我不能确定这是因为,不是.
我的问题是,既然我没有向司机收集任何东西,为什么要spark.driver.maxResultSize在这里重要?驱动程序的内存是否用于我不知道的连接中的某些内容?
我有一个来自s3的大型(大约85 GB压缩)gzip压缩文件,我正在尝试使用AWS EMR上的Spark处理(现在有一个m4.xlarge主实例和两个m4.10xlarge核心实例,每个实例都有一个100 GB的EBS卷) .我知道gzip是一种不可拆分的文件格式,我 看到 它 建议应该重新分区压缩文件,因为Spark最初给出了一个带有一个分区的RDD.但是,做完之后
scala> val raw = spark.read.format("com.databricks.spark.csv").
| options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
| load("s3://path/to/file.gz").
| repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()
Run Code Online (Sandbox Code Playgroud)
并且看一下Spark应用程序UI,我仍然看到只有一个活动执行程序(其他14个已经死了)有一个任务,并且作业永远不会完成(或者至少我没有等待足够长的时间).
Shellsheck 是一个 shell 脚本的静态分析工具,可以在某些 Linux 系统上本地安装,也可以不安装在线使用来检查 bash 脚本是否存在一些错误。
测试环境:
主目录
#!/bin/bash
source ./sourcefile.sh
echo "Output from main.sh"
echo
echo
fkt_output_from_sourcefile_sh
echo
echo
echo "For closing window, press Enter or Ctl + C"; read -r
Run Code Online (Sandbox Code Playgroud)
源文件.sh
#!/bin/bash
fkt_output_from_sourcefile_sh() {
echo "Output from sourcefile.sh"
}
Run Code Online (Sandbox Code Playgroud)
我是如何在终端上运行它的:
shellcheck -x main.sh
Run Code Online (Sandbox Code Playgroud)
终端输出(看起来工作正常):
Output from main.sh
Output from sourcefile.sh
For closing window, press Enter or Ctl + C
Run Code Online (Sandbox Code Playgroud)
通过 shellcheck …
我正在尝试使用 boto3 在不同帐户的存储桶中的前缀之间执行 S3 同步。我的尝试继续列出帐户 A 中的源存储桶/前缀中的对象,列出帐户 B 中的目标存储桶/前缀中的对象,然后复制前者中 ETag 与帐户 B 中对象的 ETag 不匹配的对象。后者。这似乎是正确的做法。
但是,似乎即使复制操作成功,每次执行复制时目标对象的 ETag 都不一样。具体来说,
>>> # Here is the source object: {'Key': 'blah/blah/file_20210328_232250.parquet', 'LastModified': datetime.datetime(2021, 3, 28, 23, 38, 2, tzinfo=tzutc()), 'ETag': '"ba230f7a358cf1bee6c98250089da435"', 'Size': 52319, 'StorageClass': 'STANDARD'}
>>> client.copy_object(
CopySource={"Bucket": "source-bucket-in-acct-a", "Key": "blah/blah/file_20210328_232250.parquet"),
Bucket="dest-bucket-in-acct-b",
Key="blah/blah/file_20210328_232250.parquet"
)
... 'CopyObjectResult': {'ETag': '"84f11f744cf996e16a3af0d6d2fbee07"', 'LastModified': datetime.datetime(2021, 4, 20, 2, 23, 40, tzinfo=tzutc())}}
Run Code Online (Sandbox Code Playgroud)
请注意,ETag 已更改。如果我再次运行该副本,它将再次具有不同的 ETag。我已经尝试了复制请求的所有附加参数(MetadataDirective="COPY"等)。我可能缺少保留 ETag 的东西,但我的理解是 ETag 是从对象的数据派生的,而不是其元数据。
现在, AWS 文档中说ETag对于成功的非多部分复制操作是相同的,事实确实如此,但情况似乎并非如此。它不是多部分副本,我已经检查了实际数据;它们是相同的。因此,我的问题是:
如果不是因为复制不成功,对象的 ETag 怎么会改变呢?
问题不在于如何测试数组是否为空(arr.length == 0这很好)。相反,我的问题是,为什么
scala> Array().isEmpty
res1: Boolean = true
Run Code Online (Sandbox Code Playgroud)
工作和
scala> val x = Array[String]()
x: Array[String] = Array()
scala> x.isEmpty
res2: Boolean = true
Run Code Online (Sandbox Code Playgroud)
工作,但是
scala> val y = Array()
y: Array[Nothing] = Array()
scala> y.isEmpty
<console>:13: error: value isEmpty is not a member of Array[Nothing]
y.isEmpty
^
Run Code Online (Sandbox Code Playgroud)
才不是?
假设我有一个类型
data Options = Options
{ _optionOne :: Maybe Integer
, _optionTwo :: Maybe Integer
, _optionThree :: Maybe String
} deriving Show
Run Code Online (Sandbox Code Playgroud)
有更多的领域。我想Monoid为这种类型定义一个实例,其mempty值是 an Optionswith all fields Nothing。有没有比这更简洁的写法
instance Monoid Options where
mempty = Options Nothing Nothing Nothing
mappend = undefined
Run Code Online (Sandbox Code Playgroud)
Nothing当我Options有更多字段时,这将避免需要写一堆s ?
我开始学习Haskell,当我学习一门新语言时,我喜欢做的一件事就是将Project Euler问题作为我主要参考资料的补充.
我找到了第二个问题的解决方案,即找到偶数斐波纳契数不到四百万的总和:
fibs = 0 : 1 : zipWith (+) fibs (tail fibs)
f :: Integer -> Integer
f n =
let evenFib = filter (\n -> n `mod` 2 == 0) fibs
in sum (takeWhile (<n) evenFib)
Run Code Online (Sandbox Code Playgroud)
这很好用; f 4000000返回正确的答案.它立即这样做.好奇,我开始输入越来越大的数字......
Prelude> f 40000000
19544084
Prelude> f 400000000000
478361013020
Prelude> f 40000000000000000000000000000000
13049874051046942401006156573274
Prelude> f 2370498572349582734598273495872349587234958723948752394857
2805750129675962215536656398462489370528480907433875715844
Run Code Online (Sandbox Code Playgroud)
立即返回这些值中的每一个.我无法保证最后两个答案的真实性,因为我在其他语言中的实现不适用于这么大的数字.
所以,我的问题是,Haskell在这做什么?它是如何即时返回这些值的(无论它们实际上是否正确)?此外,这些答案确实是正确的,还是Haskell只是制作东西?
我有一个类型
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE MultiWayIf #-}
import GHC.Generics
import Data.Aeson.TH
import Data.Aeson.Types
data MyJSONObject = MyJSONObject
{ name :: String
, ptype :: Maybe String
, pid :: Maybe String
, subject :: Maybe String
, message :: Maybe String
} deriving (Show, Generic)
Run Code Online (Sandbox Code Playgroud)
还有更多的Maybe String领域.我FromJSON和ToJSON这个TemplateHaskell功能提供
$(deriveJSON defaultOptions
{
omitNothingFields = True
, fieldLabelModifier = \f -> if
| f == "ptype" -> "type" -- …Run Code Online (Sandbox Code Playgroud) 我试图从Haskell程序中生成一个进程,我想将其标准错误流打印到屏幕,同时也将相同的流写入文件,就像tee命令实现的那样.
我可以打印标准错误流:
import Data.Conduit ((.|), runConduit)
import qualified Data.Conduit.List as CL
import Data.Conduit.Process
main :: IO ()
main = do
(ClosedStream, ClosedStream, err, sph) <- streamingProcess (shell myCommand)
runConduit $ err .| CL.mapM_ print
Run Code Online (Sandbox Code Playgroud)
我可以将流指向文件:
import System.IO (withFile, IOMode (..))
import Data.Conduit.Process
main :: IO ()
main = do
let logFile = "myCommand.log"
withFile logFile WriteMode $ \h -> do
(ClosedStream, ClosedStream, UseProvidedHandle, sph) <-
streamingProcess (shell myCommand) {std_err = UseHandle h}
Run Code Online (Sandbox Code Playgroud)
我怎样才能同时做到这两件事?
我试图使用Sqoop将一个NFL播放结果的MySQL表导入HDFS.我发出以下命令来实现这个目的:
sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/nfl \
--username <username> -P \
--table play
Run Code Online (Sandbox Code Playgroud)
不幸的是,有一些类型的列,TINYINT在导入时会被转换为布尔值.例如,游戏发生在游戏的四分之一处有一个"四分之一"列.如果游戏发生在第一季度,则此列中的值将转换为"真",否则将转换为"假".
事实上,我做了一个sqoop import-all-tables,导入我拥有的整个NFL数据库,它的行为就像这样统一.
有没有解决的办法,或者一些论据import或import-all-tables防止这种情况的发生?
我有一个Servant应用程序和一个端点,该端点在数据库中创建一条记录,然后尝试在S3位置之间复制文件。如果复制失败,我想回滚事务。我有这个接线员
{-# LANGUAGE TemplateHaskell #-}
import Control.Monad.Catch
import Control.Monad.Except
import Control.Monad.Logger
(<??)
:: (MonadError e m, MonadCatch m, MonadLogger m)
=> e
-> m a
-> m a
(<??) err a = a `catchAll` (\e -> $(logErrorSH) e >> throwError err)
infixr 0 <??
Run Code Online (Sandbox Code Playgroud)
捕获所有异常,记录异常的性质,然后引发(在我的情况下,因为我的App类型具有的实例MonadError ServantErr)a ServantErr。
我的处理程序是这样的:
{-# LANGUAGE ScopedTypeVariables #-}
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import qualified Network.AWS as AWS
import Servant
import App.Types
import App.Db
copy :: Copy -> App Text …Run Code Online (Sandbox Code Playgroud)