I am trying to read a parquet file which is present in AWS S3 and getting the below error.
17/12/19 11:27:40 DEBUG DAGScheduler: ShuffleMapTask finished on 0
17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ResultStage 2)
17/12/19 11:27:40 DEBUG DAGScheduler: missing: List(ShuffleMapStage 1)
17/12/19 11:27:40 DEBUG DAGScheduler: submitStage(ShuffleMapStage 1)
17/12/19 11:27:40 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1, runningTasks: 2
17/12/19 11:27:40 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 4, ip-xxx-xxx-xxx-xxx.ec2.internal): com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: …Run Code Online (Sandbox Code Playgroud) amazon-s3 cassandra apache-spark parquet spark-cassandra-connector
我试图复制Spring Boot Kotlin示例项目https://github.com/JetBrains/kotlin-examples/tree/master/tutorials/spring-boot-restful。我添加了更多依赖项,当我尝试构建可执行 jar 并运行它时,出现错误:
无法找到或加载主类...
Gradle 构建脚本:
buildscript {
ext.kotlin_version = '1.1.3' // Required for Kotlin integration
ext.spring_boot_version = '1.5.4.RELEASE'
repositories {
jcenter()
}
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" // Required for Kotlin integration
classpath "org.jetbrains.kotlin:kotlin-allopen:$kotlin_version" // See https://kotlinlang.org/docs/reference/compiler-plugins.html#kotlin-spring-compiler-plugin
classpath "org.springframework.boot:spring-boot-gradle-plugin:$spring_boot_version"
}
}
/*plugins {
id 'org.springframework.boot' version '2.0.0.RELEASE'
}*/
apply plugin: 'kotlin' // Required for Kotlin integration
apply plugin: "kotlin-spring" // See https://kotlinlang.org/docs/reference/compiler-plugins.html#kotlin-spring-compiler-plugin
apply plugin: 'org.springframework.boot'
jar {
baseName = 'gs-rest-service'
version = '0.1.0' …Run Code Online (Sandbox Code Playgroud) 我有 3 个 python 类 A、B 和 C。A 包含 B 对象,B 包含 C 的对象。我想要的是当我打印一个类对象时,它应该以下面的格式打印。C类内部也可以有更多的嵌套。
A:
loc : XYZ
qual : ABC
b :
name : ABC
age : 30
c :
address : ABC
phn : 99009
Run Code Online (Sandbox Code Playgroud)
以下是可供参考的类。
class C(object):
def __init__(self):
self.address='ABC'
self.phn=99009
class B(object):
def __init__(self):
self.name='ABC'
self.age=30
self.c = C()
class A(object):
def __init__(self):
self.loc = 'XYZ'
self.qual = 'ABC'
self.b = B()
Run Code Online (Sandbox Code Playgroud) 我正在使用“gosnowflake”驱动程序从我的 Golang 应用程序中查询 Snowflake DB。雪花模式:- NAME STRING AGE INTEGER LOCS ARRAY
Golang 代码:-
package main
import (
"database/sql"
"fmt"
"log"
"strings"
_ "github.com/snowflakedb/gosnowflake"
)
type Person struct {
Name string
Age string
Locs []string
}
var DB *sql.DB
func main() {
connString := "USER" + ":" + "PWD" + "@" + "REGION" + "/" + "TEST"
fmt.Println("DB Connection string..", connString)
DB, _ = sql.Open("snowflake", connString)
defer DB.Close()
QRY := "SELECT NAME,AGE,LOCS FROM TEST.PERSON WHERE NAME='abc'"
result, _ := DB.Query(QRY)
fmt.Println(result) …Run Code Online (Sandbox Code Playgroud) 要求:我有一个 .gz 格式的 Json 文件。因此,当它被压缩时,它的大小约为 500 MB。当我提取它时,json 文件变得接近约 10 GB。提取的 JSON 文件逐行包含单个 JSON 对象。我想要的是ps使用任何 bash 脚本或 python 程序根据字段对文件进行排序。
由于文件太大,不建议将其加载到内存中。因此,我使用 gzcat 和 cat bash 命令来流式传输 JSON 数据,然后将它们通过管道传输到 jq 以进行排序。但是系统在此过程中没有响应,或者我在 output.json 中得到了空文件
>cat sth2.json | parallel --pipe --group --block 1000M --recend '\n}\n' "jq -s -c 'sort_by(.ps) | .[]'" > "output.json"
>gzcat sth2.json.gz | parallel --pipe --group --block 1000M --recend '\n}\n' "jq -s -c 'sort_by(.ps) | .[]'" > "output.json"
Run Code Online (Sandbox Code Playgroud)
硬件:16GB 内存,核心 i5 处理器
示例 JSON 数据:-
{
"ps":"abc"
.... …Run Code Online (Sandbox Code Playgroud) 我正在运行Akka Streams Reactive Kafka应用程序,该应用程序应该在高负载下运行.运行应用程序大约10分钟后,应用程序关闭了OutOfMemoryError.我试图调试堆转储,发现它akka.dispatch.Dispatcher占用了大约5GB的内存.以下是我的配置文件.
Akka版本:2.4.18
Reactive Kafka版本:2.4.18
1 application.conf.:
consumer {
num-consumers = "2"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers=${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "testakkagroup1"
subscription-topic = "test"
subscription-topic=${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 100ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
Run Code Online (Sandbox Code Playgroud)
2 . build.sbt:
java -Xmx6g \
-Dcom.sun.management.jmxremote.port=27019 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=localhost \
-Dzookeeper.host=$ZK_HOST \ …Run Code Online (Sandbox Code Playgroud) 我正在尝试从我的 python 应用程序中构建 docker 图像。但是当我将“snowflake-connector-python”添加到 pip requirements.txt 文件时,docker build 失败了。
要求.txt:-
boto3
pycryptodome
snowflake-connector-python
Run Code Online (Sandbox Code Playgroud)
Dockerfile:-
FROM python:alpine3.6
RUN python --version
COPY . /sample-app
WORKDIR /sample-app
RUN python -m pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip --version
RUN pip install -r requirements.txt
CMD [ "python", "runner.py" ]
Run Code Online (Sandbox Code Playgroud)
CLI docker 构建:-
docker build -t sample-app:1.0 .
Run Code Online (Sandbox Code Playgroud)
错误 :-
Step 8/9 : RUN pip install -r requirements.txt
---> Running in 13ab5873e0b5
Collecting boto3 (from -r requirements.txt (line 1))
Downloading …Run Code Online (Sandbox Code Playgroud) 要求:
我想对JsonNode. 功能可以不同,例如:-lowercasing某些值或将某些内容附加到这些值或用某些内容替换这些值。如何使用Jackson库实现这一目标?请注意,JSON 数据的结构可能不同,这意味着我想构建一个通用系统,该系统将接受一些路径表达式,这将基本上决定在哪里更改。我想使用函数式编程风格,以便我可以将这些函数作为参数传递。
例如:
输入:
{
"name": "xyz",
"values": [
{
"id": "xyz1",
"sal": "1234",
"addresses": [
{
"id": "add1",
"name": "ABCD",
"dist": "123"
},
{
"id": "add2",
"name": "abcd3",
"dist": "345"
}
]
},
{
"id": "xyz2",
"sal": "3456",
"addresses": [
{
"id": "add1",
"name": "abcd",
"dist": "123"
},
{
"id": "add2",
"name": "XXXXX",
"dist": "345"
}
]
}
]
}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,我基本上必须使用两个函数,lowercase()并且convert_to_number(). 我想申请lowercase()上的所有函数"name"内的所有属性"addresses" …
我想将旧的cassandra集群数据迁移到新集群,并考虑编写一些spark作业来实现。有什么方法可以与来自同一SparkContext的多个cassandra集群进行交互。这样我就可以使用同一sparkJob中的saveToCassandra函数从一个群集读取数据并写入另一个群集。
val products = sc.cassandraTable("first_cluster","products").cache()
products.saveToCassandra("diff_cluster","products2")
Run Code Online (Sandbox Code Playgroud)
我们可以将数据保存到其他群集中吗?
apache-spark ×2
cassandra ×2
json ×2
python ×2
snowflake-cloud-data-platform ×2
akka ×1
akka-stream ×1
amazon-s3 ×1
bash ×1
docker ×1
dockerfile ×1
go ×1
gradle ×1
jackson ×1
java ×1
jq ×1
jsonpath ×1
kotlin ×1
parquet ×1
pretty-print ×1
python-3.x ×1
scala ×1
sorting ×1
spring-boot ×1