我想使用Kafka Connect将数据从Kafka存储到存储桶s3中。我已经在运行一个Kafka的主题,并且创建了一个s3存储桶。我的主题包含有关Protobuffer的数据,我尝试使用https://github.com/qubole/streamx并获得了下一个错误:
[2018-10-04 13:35:46,512] INFO Revoking previously assigned partitions [] for group connect-s3-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2018-10-04 13:35:46,512] INFO (Re-)joining group connect-s3-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:326)
[2018-10-04 13:35:46,645] INFO Successfully joined group connect-s3-sink with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434)
[2018-10-04 13:35:46,692] INFO Setting newly assigned partitions [ssp.impressions-11, ssp.impressions-10, ssp.impressions-7, ssp.impressions-6, ssp.impressions-9, ssp.impressions-8, ssp.impressions-3, ssp.impressions-2, ssp.impressions-5, ssp.impressions-4, ssp.impressions-1, ssp.impressions-0] for Group connect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219)
[2018-10-04 13:35:47,193] ERROR Task s3-sink-0 threw an uncaught an unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.NullPointerException
at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:122)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at …Run Code Online (Sandbox Code Playgroud) 我想使用 IntelliJ IDEA 直接在集群中运行 Spark 进程,因此我遵循下一个文档https://docs.azuredatabricks.net/user-guide/dev-tools/db-connect.html
配置完所有内容后,我运行databricks-connect test但没有获得文档所述的 Scala REPL。
这是我的集群配置
我想加入 2 个流,但收到下一个错误,但我不知道如何修复它:
当无水印的流式 DataFrame/DataSet 上有流式聚合时,不支持追加输出模式;;\nJoin Inner
df_stream = spark.readStream.schema(schema_clicks).option("ignoreChanges", True).option("header", True).format("csv").load("s3://mybucket/*.csv")
display(df_stream.select("SendID", "EventType", "EventDate"))
Run Code Online (Sandbox Code Playgroud)
我想将 df1 与 df2 一起加入:
df1 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.select(col("SendID"), col("timestamp"), col("EventType")) \
.withColumnRenamed("SendID", "SendID_update") \
.withColumnRenamed("timestamp", "timestamp_update") \
.withWatermark("timestamp_update", "1 minutes")
df2 = df_stream \
.withColumn('timestamp', unix_timestamp(col('EventDate'), "MM/dd/yyyy hh:mm:ss aa").cast(TimestampType())) \
.withWatermark("timestamp", "1 minutes") \
.groupBy(col("SendID")) \
.agg(max(col('timestamp')).alias("timestamp")) \
.orderBy('timestamp', ascending=False)
join = df2.alias("A").join(df1.alias("B"), expr(
"A.SendID = B.SendID_update" +
" AND " +
"B.timestamp_update >= A.timestamp " + …Run Code Online (Sandbox Code Playgroud) 我需要找到具有特定前缀的所有文件。例如:
raw/client/Hist/2017/*/*/Tracking_*.zip
Run Code Online (Sandbox Code Playgroud)
我试过这行代码,但它不起作用:
raw/client/Hist/2017/*/*/Tracking_*.zip
Run Code Online (Sandbox Code Playgroud) 我将 zip 文件存储在 Amazon s3 中,然后我有一个 Python 列表 as ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"],我需要使用 Spark Cluster 解压缩所有这些文件,并将所有 CSV 文件存储到一个增量格式表中。我想知道比我目前的方法更快的处理方法:
1) 我有一个bucle 用于在我的 Python 列表中进行迭代。
2)我正在使用 Python Boto3 从 s3 获取 zip 文件 s3.bucket.Object(file)
3)我正在使用下一个代码解压缩文件
import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
shutil.rmtree(outputZip)
dbutils.fs.rm("dbfs:" + outputZip, True)
Run Code Online (Sandbox Code Playgroud)
4)我的文件在驱动程序节点中解压缩,然后执行程序无法访问这些文件(我没有找到方法)所以我使用所有这些 csv …
我在 Scala 中编译代码时遇到了下一个错误。我该如何解决这个问题?我的java版本是“1.8.0_231”。我的 scala 版本是:Scala code runner 版本 2.11.8 -- 版权所有 2002-2016,LAMP/EPFL
[错误](commonsUtils/Compile/compileIncremental)编译 sbt 组件“compiler-bridge_2.11”时出错
跟踪如下:
[info] Non-compiled module 'compiler-bridge_2.11' for Scala 2.11.8. Compiling...
error: scala.reflect.internal.MissingRequirementError: object java.lang.Object in compiler mirror not found.
at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:17)
at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:18)
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:53)
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:45)
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:66)
at scala.reflect.internal.Mirrors$RootsBase.getClassByName(Mirrors.scala:102)
at scala.reflect.internal.Mirrors$RootsBase.getRequiredClass(Mirrors.scala:105)
at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass$lzycompute(Definitions.scala:257)
at scala.reflect.internal.Definitions$DefinitionsClass.ObjectClass(Definitions.scala:257)
at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1394)
at scala.tools.nsc.Global$Run.<init>(Global.scala:1215)
at scala.tools.nsc.Driver.doCompile(Driver.scala:31)
at scala.tools.nsc.MainClass.doCompile(Main.scala:23)
at scala.tools.nsc.Driver.process(Driver.scala:51)
at scala.tools.nsc.Main.process(Main.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at sbt.internal.inc.RawCompiler.getReporter$1(RawCompiler.scala:56)
at sbt.internal.inc.RawCompiler.apply(RawCompiler.scala:77)
at sbt.internal.inc.AnalyzingCompiler$.$anonfun$compileSources$6(AnalyzingCompiler.scala:363) …Run Code Online (Sandbox Code Playgroud) 我有下一个不带引号且带有数组和子字典的字符串:
s ='{source: [s3, kinesis], aws_access_key_id: {myaws1, myaws2}, aws_secret_access_key: REDACTED_POSSIBLE_AWS_SECRET_ACCESS_KEY, bucketName: bucket, region_name: eu-west-1, fileType: zip, typeIngestion: FULL, project: trackingcampaigns, functionalArea: client, filePaths: [s3Sensor/2018/], prefixFiles: [Tracking_Sent, Tracking_Bounces, Tracking_Opens, Tracking_Clicks, Tracking_SendJobs], prefixToTables: {Tracking_Bounces: MNG_TRACKING_EXTRACT_BOUNCES_3, Tracking_Sent: MNG_TRACKING_EXTRACT_SENT_3, Tracking_Clicks: MNG_TRACKING_EXTRACT_CLICKS_3, Tracking_Opens: MNG_TRACKING_EXTRACT_OPENS_3, Tracking_SendJobs: MNG_TRACKING_EXTRACT_SENDJOBS_3}, stagingPath: /zipFiles/}'
Run Code Online (Sandbox Code Playgroud)
我想把它转换成字典。
我正在使用 Azure Databricks 和 Scala。我想 show() 一个数据框,但我收到了一个我无法理解的错误,我想解决它。我的代码行是:
println("----------------------------------------------------------------Printing schema")
df.printSchema()
println("----------------------------------------------------------------Printing dataframe")
df.show()
println("----------------------------------------------------------------Error before")
Run Code Online (Sandbox Code Playgroud)
标准输出如下,消息“---------------------------------------------------- -------------------------错误之前”它不会出现。
> ----------------------------------------------------------------Printing schema
> root
> |-- processed: integer (nullable = false)
> |-- processDatetime: string (nullable = false)
> |-- executionDatetime: string (nullable = false)
> |-- executionSource: string (nullable = false)
> |-- executionAppName: string (nullable = false)
>
> ----------------------------------------------------------------Printing dataframe
> 2020-02-18T14:19:00.069+0000: [GC (Allocation Failure) [PSYoungGen: 1497248K->191833K(1789440K)] 2023293K->717886K(6063104K),
> 0.0823288 secs] [Times: user=0.18 sys=0.02, real=0.09 secs]
> 2020-02-18T14:19:40.823+0000: [GC …Run Code Online (Sandbox Code Playgroud) python ×4
apache-spark ×3
amazon-s3 ×2
scala ×2
apache-kafka ×1
boto3 ×1
databricks ×1
delta-lake ×1
java ×1
sbt ×1
zip ×1