我需要使用以下路由创建 nginx.conf 文件:/assets - 静态文件,/akka - 用于 hello-world akka webapp,/* - 默认页面。我的 default.conf 文件如下所示:
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
upstream hello-akka{
server localhost:9000;
}
server {
listen 80;
location / {
root /usr/share/nginx/html;
index index.html index.htm;
}
location /akka {
proxy_pass http://hello-akka;
}
location /assets {
root /var/www;
}
}
}
Run Code Online (Sandbox Code Playgroud)
每次重新启动 nginx 服务时,都会收到以下错误消息:
nginx: [emerg] "worker_processes" directive is not allowed here in /etc/nginx/conf.d/default.conf:2
nginx: configuration file …Run Code Online (Sandbox Code Playgroud) 我需要屏蔽电话号码.它可能包含数字,+(国家代码)和破折号.国家/地区代码可能包含1位或更多位数字.我创建了这种正则表达式来掩盖除了最后4位之外的所有数字:
inputPhoneNum.replaceAll("\\d(?=\\d{4})", "*");
Run Code Online (Sandbox Code Playgroud)
对于这样的输入:+13334445678
我得到了这个结果:+*******5678
但是,它不适用于此类输入:+ 1-333-444-5678 特别是,它返回相同的数字而不做任何更改.虽然所需的输出屏蔽了除最后4位以外的所有数字,加号和短划线.这就是为什么我想知道如何改变我的正则表达式以包括短划线?我将不胜感激任何帮助!
是否可以使用一些 Spark Conf 设置在本地模式下为 Spark Streaming 应用程序设置多个执行程序?目前,例如,当我将spark.executor.instances参数更改为 4时,我看不到 Spark UI 在性能或执行程序数量增加方面的任何变化。
我已经定义了几个用于JSON表示的案例类,但是由于存在很多嵌套的案例类,因此我不确定我是否正确执行了。诸如spec,meta之类的实体的类型均为JSONObject以及Custom对象本身。
这是我定义的所有类:
case class CustomObject(apiVersion: String,kind: String, metadata: Metadata,spec: Spec,labels: Object,version: String)
case class Metadata(creationTimestamp: String, generation: Int, uid: String,resourceVersion: String,name: String,namespace: String,selfLink: String)
case class Spec(mode: String,image: String,imagePullPolicy: String, mainApplicationFile: String,mainClass: String,deps: Deps,driver: Driver,executor: Executor,subresources: Subresources)
case class Driver(cores: Double,coreLimit: String,memory: String,serviceAccount: String,labels: Labels)
case class Executor(cores: Double,instances: Double,memory: String,labels: Labels)
case class Labels(version: String)
case class Subresources(status: Status)
case class Status()
case class Deps()
Run Code Online (Sandbox Code Playgroud)
这是我需要转换的自定义K8s对象的JSON结构:
{
"apiVersion": "sparkoperator.k8s.io/v1alpha1",
"kind": "SparkApplication",
"metadata": {
"creationTimestamp": "2019-01-11T15:58:45Z",
"generation": 1,
"name": "spark-example", …Run Code Online (Sandbox Code Playgroud) 我一直在尝试使用Kubernetes在本地部署带有架构注册表的Kafka。但是,架构注册表窗格的日志显示此错误消息:
ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Run Code Online (Sandbox Code Playgroud)
这种行为的原因可能是什么?'为了在本地运行Kubernetes,我将Minikube v0.32.0版本与Kubernetes v1.13.0版本一起使用
我的Kafka配置:
apiVersion: v1
kind: Service
metadata:
name: kafka-1
spec:
ports:
- name: client
port: 9092
selector:
app: kafka
server-id: "1"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-1
spec:
selector:
matchLabels:
app: kafka
server-id: "1"
replicas: 1
template:
metadata:
labels:
app: kafka
server-id: "1"
spec:
volumes:
- name: kafka-data
emptyDir: {}
containers:
- name: server
image: confluent/kafka:0.10.0.0-cp1
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-1:2181
- …Run Code Online (Sandbox Code Playgroud) apache-kafka kubernetes apache-zookeeper confluent-schema-registry
我已经使用Kubernetes中的httpfs设置设置了对HDFS的访问权限,因为我需要访问HDFS数据节点,而不仅是名称节点上的元数据。我可以使用带有telnet的节点端口服务连接到HDFS,但是,当我尝试从HDFS中获取一些信息时-读取文件,检查文件是否存在,会出现错误:
[info] java.net.SocketTimeoutException: Read timed out
[info] at java.net.SocketInputStream.socketRead0(Native Method)
[info] at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
[info] at java.net.SocketInputStream.read(SocketInputStream.java:171)
[info] at java.net.SocketInputStream.read(SocketInputStream.java:141)
[info] at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
[info] at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
[info] at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
[info] at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
[info] at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
[info] at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
Run Code Online (Sandbox Code Playgroud)
此错误的原因可能是什么?这是用于建立与HDFS文件系统的连接并检查文件是否存在的源代码:
val url = "webhdfs://192.168.99.100:31400"
val fs = FileSystem.get(new java.net.URI(url), new org.apache.hadoop.conf.Configuration())
val check = fs.exists(new Path(dirPath))
Run Code Online (Sandbox Code Playgroud)
由dirPath参数指定的目录位于HDFS上。
HDFS Kubernetes设置如下所示:
apiVersion: v1
kind: Service
metadata:
name: namenode
spec:
type: NodePort
ports:
- name: client
port: 8020
- name: hdfs
port: 50070
nodePort: 30070 …Run Code Online (Sandbox Code Playgroud) 我在大学里有一项任务,我需要使用Median-of-3快速排序按学生的组号对其进行排序。但是我的排序方法无法正常工作-仅对某些元素进行了排序。但是有些要素不会改变立场。我用必要的数据创建了一个Student类,使用Student类型数组的Sortings类。在主类中,我有4种用于此类排序的静态方法,在该类中,我创建了Sortings的实例并调用我的静态方法进行排序。
public static void manual_Sort(Student[]st, int right, int left)
{
int size = right - left + 1;
if(size <= 1)
return;
if(size == 2)
{
if(st[left].Group_number > st[right].Group_number)
swap(st,right,left);
return;
}
else
{
if(st[left].Group_number > st[right-1].Group_number)
swap(st,left, right-1);
if(st[left].Group_number > st[right].Group_number)
swap(st,left, right);
if(st[right-1].Group_number > st[right].Group_number )
swap(st,right-1, right);
}
}
public static int partitionIt(Student[]st, int left, int right,double pivot)
{
int leftPtr = left;
int rightPtr …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用具有指定偏移量的 Spark jdbc 从 MSSQL 数据库读取数据。因此,应该仅在指定的时间戳(即该偏移量)之后加载数据。我尝试通过在 jdbc 配置中提供查询来实现它,但是,我没有找到使用参数化值创建准备好的语句的可能性。在这种情况下,我想参数化一个偏移量,该偏移量在每次应用程序启动后都会改变。如何使用 jdbc 选项来实现它?
所有数据库配置都位于 application.conf 文件中。这是我从数据库读取的方式:
def jdbcOptions(query: String) = Map[String,String](
"driver" -> config.getString("sqlserver.db.driver"),
"url" -> config.getString("sqlserver.db.url"),
"dbtable" -> s"(select * from TestAllData where update_database_time >= '2019-03-19 12:30:00.003') as subq,
"user" -> config.getString("sqlserver.db.user"),
"password" -> config.getString("sqlserver.db.password"),
"customSchema" -> config.getString("sqlserver.db.custom_schema")
)
val testDataDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions())
.load()
Run Code Online (Sandbox Code Playgroud)
相反,查询应该看起来几乎像这样:
s"(select * from TestAllData where update_database_time >= $tmstp) as subq
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用存储在架构注册表中的 Avro 架构将数据从 Spark 数据帧保存到 HDFS。但是,我在写入数据时遇到错误:
Caused by: org.apache.avro.AvroRuntimeException: Not a union: {"type":"long","logicalType":"timestamp-millis"}
at org.apache.avro.Schema.getTypes(Schema.java:299)
at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:51)
at org.apache.spark.sql.avro.AvroOutputWriter.serializer$lzycompute(AvroOutputWriter.scala:42)
at org.apache.spark.sql.avro.AvroOutputWriter.serializer(AvroOutputWriter.scala:42)
at org.apache.spark.sql.avro.AvroOutputWriter.write(AvroOutputWriter.scala:64)
at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
Run Code Online (Sandbox Code Playgroud)
可能是什么原因?
Avro 架构中的字段如下所示:
{"name":"CreateDate","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}
Run Code Online (Sandbox Code Playgroud)
以下是日期格式的示例:
1900-01-01 00:00:00
Run Code Online (Sandbox Code Playgroud)
Spark dataframe中该字段的数据类型:
|-- CreateDate: timestamp (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是我写入数据的方式:
dataDF.write
.mode("append")
.format("avro")
.option(
"avroSchema",
SchemaRegistry.getSchema(
schemaRegistryConfig.url,
schemaRegistryConfig.dataSchemaSubject,
schemaRegistryConfig.dataSchemaVersion))
.save(hdfsURL)
Run Code Online (Sandbox Code Playgroud) 我正在构建集成测试,它将读取在先前测试用例之后生成的数据,并将其与预期结果进行核对。当我运行测试时,尽管生成的数据位于该目录中,但在下一个测试用例中该目录中不可见该生成的数据。当我重新运行测试时,将拾取数据并从目录中读取数据。可能是什么原因呢?测试执行的顺序可能有问题吗?
这是我的测试结果:
class LoaderSpec extends Specification{
sequential
"Loader" should {
"run job from assembled .jar" in {
val res = "sh ./src/test/runLoader.sh".!
res must beEqualTo(0)
}
"write results to the resources" in {
val resultsPath = "/results/loader_result"
resourcesDirectoryIsEmpty(resultsPath) must beFalse
}
"have actual result same as expected one" in {
val expected: Set[String] = readFilesFromDirs("source/loader_source")
println(expected)
val result: Set[String] = readFilesFromDirs("/results/loader_result")
println(result)
expected must beEqualTo(result)
}
}
}
Run Code Online (Sandbox Code Playgroud)
前一个测试成功,而后两个测试失败,因为未找到数据。当我重新运行相同的测试套件而不进行任何更改时,所有测试都将成功。
runLoader.sh脚本:
$SPARK_HOME/bin/spark-submit \
--class "loader.LoaderMain" \
\
--conf "spark.hadoop.fs.gs.impl=com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem" \
--conf …Run Code Online (Sandbox Code Playgroud) apache-spark ×5
scala ×4
kubernetes ×3
hdfs ×2
java ×2
akka ×1
apache-kafka ×1
avro ×1
case-class ×1
config ×1
eclipse ×1
json ×1
masking ×1
nginx ×1
quicksort ×1
regex ×1
spark-jdbc ×1
specs2 ×1
sql-server ×1
testing ×1