试图理解IntStream和之间的区别Stream<Integer>。我用下面的例子来说明:
int[] someInts = {1, 2, 3, 4, 5};
var intStream = Arrays.stream(someInts);
var streamInteger = Arrays.stream(someInts).boxed();
Run Code Online (Sandbox Code Playgroud)
两者有什么区别?这种差异与性能有何关系?
我正在尝试做什么
我正在使用PyArrow读取一些 CSV 并将它们转换为 Parquet。我读到的一些文件有很多列并且内存占用很大(足以使运行作业的机器崩溃)。我试图在读取 CSV 时对文件进行分块,其方式与 Pandas read_csv的chunksize工作方式类似。
例如,这就是分块代码在 pandas 中的工作方式:
chunks = pandas.read_csv(data, chunksize=100, iterator=True)
# Iterate through chunks
for chunk in chunks:
do_stuff(chunk)
Run Code Online (Sandbox Code Playgroud)
我想将类似的功能移植到 Arrow
我尝试过做什么
我注意到 Arrow 有ReadOptions其中包含一个block_size参数,我想也许我可以像这样使用它:
# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
input_file=input_buffer,
read_options=arrow_csv.ReadOptions(
use_threads=True,
block_size=4096
)
)
# Iterate through batches
for batch in arrow_table.to_batches():
do_stuff(batch)
Run Code Online (Sandbox Code Playgroud)
由于这个 ( block_size) 似乎没有返回迭代器,所以我的印象是这仍然会让 Arrow 读取内存中的整个表,从而重新创建我的问题。
最后,我知道我可以首先使用 Pandas 读取 csv 并对其进行分块,然后转换为 Arrow 表。但我试图避免使用 Pandas,只使用 Arrow。 …
我无法理解为什么 getter/setter 方法的名称必须与属性具有相同的名称。
我尝试在这里阅读亚伦·霍尔的答案,但我仍然找不到(或错过了)关于为什么我们必须使用各自名称的解释。
class Car(object):
def __init__(self, color='blue'):
self.color = color
self.current_model = 'Opel'
@property
def model(self, new_model):
return self.current_model
@model.setter
def model(self, new_model):
if new_model == 'Audi':
raise ValueError ('NO AUDI ALLOWED')
else:
self.current_model = new_model
# model = model.setter(models) but doing this works
@model.getter
def model(self):
return self.current_model
Run Code Online (Sandbox Code Playgroud)
编辑: 我发现令人困惑的是,如果我将方法重命名为:
@model.setter
def model_new(self, new_model):
if new_model == 'Audi':
raise ValueError ('NO AUDI ALLOWED')
else:
self.current_model = new_model
Run Code Online (Sandbox Code Playgroud)
我尝试运行:
audi = Car()
audi.model = 'BMW' # …Run Code Online (Sandbox Code Playgroud) 我目前正在尝试对股价烛台进行简单的实现。假设我们有一只名为 XYZ 的股票。该股票接收一系列价格(没有特定频率),其(例如)如下所示:XYZ:[10.2, 10.7, 12, 11 ....]。
目标是记录每一分钟的一些指标,以反映该股票的状态。烛台具有收盘价(一分钟内的最后已知价格)、最高价(一分钟内的最高价格)等指标。
我认为可以实现此目的的一种方法是使用Redis TimeSeries。我考虑这个解决方案是因为我可以在价格流上创建规则,并且每 60 秒它会将一些聚合(例如:最大、最小、第一个......等)刷新到目标存储桶。
我当前使用 Redis TimeSeries (在Python中)作为每个股票价格的蜡烛图的实现看起来像这样(再次使用股票 XYZ 作为示例),并且为了简单起见没有标签:
from redistimeseries.client import Client
r = Client()
r.flushdb()
# Create source & destination buckets
r.create('XYZ_PRICES') # source
r.create(closing_price)
r.create(highest_price)
# Create rules to send data from src -> closing_price & highest_price buckets
r.createrule(src, 'closing_price', 'last', bucket_size_msec=60000)
r.createrule(src, 'highest_price', 'max', bucket_size_msec=60000)
Run Code Online (Sandbox Code Playgroud)
我的问题是:
time-series redis candlestick-chart py-redis redistimeseries
我的目标是能够设置一个 IAM 角色,该角色可以承担某个 IAM 用户的角色。创建角色后,我想稍后再回来修改这个角色,添加外部ID以建立信任关系。让我用一个例子来说明:
假设我想创建角色:
resource "aws_iam_role" "happy_role" {
name = "happy-role"
assume_role_policy = data.aws_iam_policy_document.happy_assume_rule_policy.json
}
Run Code Online (Sandbox Code Playgroud)
我们还假设它happy_assume_role_policy看起来像这样:
data "aws_iam_policy_document" "happy_assume_role_policy" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "AWS"
identifiers = [var.some_iam_user_arn]
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我将使用创建的角色来创建外部集成。但是,一旦创建完该集成,我想返回到最初创建的角色并修改其假定的角色策略。所以现在我想向假设角色策略添加一个条件,使其看起来像:
data "aws_iam_policy_document" "happy_assume_role_policy" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "AWS"
identifiers = [var.snowflake_iam_user_arn]
}
condition {
test = "StringEquals"
values = [some_integration.integration.external_id]
variable = "sts:ExternalId"
}
}
}
Run Code Online (Sandbox Code Playgroud)
换句话说,我的工作流程应该是这样的: …
我希望能够使用 Fargate 部署 AWS EKS。我已经成功地使部署与node_group. 然而,当我转而使用 Fargate 时,Pod 似乎都陷入了挂起状态。
我正在使用 Terraform 进行配置(不一定是在寻找 Terraform 答案)。这就是我创建 EKS 集群的方式:
module "eks_cluster" {
source = "terraform-aws-modules/eks/aws"
version = "13.2.1"
cluster_name = "${var.project_name}-${var.env_name}"
cluster_version = var.cluster_version
vpc_id = var.vpc_id
cluster_enabled_log_types = ["api", "audit", "authenticator", "controllerManager", "scheduler"]
enable_irsa = true
subnets = concat(var.private_subnet_ids, var.public_subnet_ids)
create_fargate_pod_execution_role = true
write_kubeconfig = false
fargate_pod_execution_role_name = "${var.project_name}-role"
# Assigning worker groups
node_groups = {
my_nodes = {
desired_capacity = 1
max_capacity = 1
min_capacity = 1 …Run Code Online (Sandbox Code Playgroud) 我正在学习 Kubernetes,我的目标是部署 Kubernetes,但通过 terraform(Kubernetes 提供程序)使用 Yaml 文件。让我用一个例子来说明:
据我了解,我可以将 ConfigMap 作为资源,例如:
resource "kubernetes_config_map" "config" {
metadata {
namespace = "metallb_system"
name = "config"
}
data {
config = "${file(${path.module}/config.yml)}"
}
}
Run Code Online (Sandbox Code Playgroud)
现在我想部署一个服务,在 HCL 中如下所示:
resource "kubernetes_service" "nginx" {
metadata {
name = "nginx-example-bla"
}
spec {
selector = {
App = kubernetes_pod.airflow.metadata[0].labels.App
}
port {
port = 80
target_port = 8080
}
type = "LoadBalancer"
}
}
Run Code Online (Sandbox Code Playgroud)
但我想用 yaml 而不是 HCL 来完成。
所以(假设),我希望能够做类似的事情:
resource "kubernetes_service" "nginx" { …Run Code Online (Sandbox Code Playgroud) 构建关于如何在气流中运行 DBT 而不复制我们的 repo 的问题,我目前正在运行气流并通过 git 同步 dag。我正在考虑在我的工作流程中包含 DBT 的不同选项。louis_guitton 的一个建议是将 DBT 项目Docker 化,并通过Docker Operator在 Airflow 中运行它。
我之前没有在 Airflow 或一般 DBT 中使用 Docker Operator 的经验。我想知道是否有人尝试过或可以提供一些有关他们结合该工作流程的经验的见解,我的主要问题是:
我目前正在研究一种将数据分配到分区表的有效方法。是否可以使用 postgres/psql 将数据复制到特定的表分区(而不是使用 INSERT)?
根据此处COPY 的文档:
COPY FROM 可与普通表、外部表或分区表或具有 INSTEAD OF INSERT 触发器的视图一起使用。
根据此处有关分区的文档:
请注意,COPY 会忽略规则。如果要使用 COPY 插入数据,则需要复制到正确的分区表而不是主分区表中。COPY 确实会触发触发器,因此如果您使用触发器方法,则可以正常使用它。
根据我对上述资源的理解,似乎可以复制到分区中;但是,我在网上找不到任何示例或支持。
换句话说,我可以写这样的东西:
COPY some_table_partition_one FROM '/some_dir/some_file'
Run Code Online (Sandbox Code Playgroud) 假设我们有两个案例类 Woof 和 Meow
case class Meow(a: String, b: String)
case class Woof(a: String, b: String)
Run Code Online (Sandbox Code Playgroud)
我们想创建一个函数 foo 将 a 与 b 连接起来,并且对 Woof 和 Meow 都通用(有点),例如:
def foo[T](meowOrWoof: T) = meowOrWoof.a + meowOrWoof.b
Run Code Online (Sandbox Code Playgroud)
当然,这不会编译,因为 a 和 b 都不是 T 的参数。
我尝试过的一件事是创建一个特征,如:
trait Pets[T] {
def foo[T](someClass: T): String
}
case class Meow(a: String, b: String) extends Pets[Meow] {
override def foo[T](someClass: T) = a + b
}
case class Woof(a: String, b: String) extends Pets[Woof] {
override def foo[T](someClass: …Run Code Online (Sandbox Code Playgroud) python ×2
terraform ×2
airflow ×1
amazon-eks ×1
apache-arrow ×1
aws-fargate ×1
dbt ×1
docker ×1
generics ×1
getter ×1
java ×1
kubernetes ×1
postgresql ×1
properties ×1
py-redis ×1
pyarrow ×1
python-3.x ×1
redis ×1
scala ×1
setter ×1
time-series ×1