我正在使用 Spark Streaming 从 kafka 主题创建一个 Dataframe。我想将 Dataframe 写入 Kinesis Producer。据我所知,目前还没有官方的 API。但互联网上有多个可用的 API,但遗憾的是,它们都不适合我。火花版本:2.2 斯卡拉:2.11
我尝试使用https://github.com/awslabs/kinesis-kafka-connector并构建 jar。但由于此 jar 和 Spark API 之间的包名称冲突而出现错误。请帮忙。
########## 这是其他人的代码:spark-shell --jars spark-sql-kinesis_2.11-2.2.0.jar,spark-sql-kafka-0-10_2.11-2.1.0.jar,spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar --files kafka_client_jaas_spark.conf --properties-file gobblin_migration.conf --conf spark.port.maxRetries=100 --driver-java-options "-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf"
import java.io.File
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import scala.sys.process._
import org.apache.log4j.{ Logger, Level, LogManager, PropertyConfigurator }
import org.apache.spark.sql.streaming.Trigger
val streamingInputDF =spark.readStream.format("kafka").option("kafka.bootstrap.servers","bootstrap server").option("subscribe", "<kafkatopic>").option("startingOffsets", "latest").option("failOnDataLoss", "false").option("kafka.security.protocol", "SASL_PLAINTEXT").load()
val xdf=streamingInputDF.select(col("partition").cast("String").alias("partitionKey"),col("value").alias("data"))
xdf.writeStream.format("kinesis").option("checkpointLocation", "<hdfspath>").outputMode("Append").option("streamName", "kinesisstreamname").option("endpointUrl","kinesisendpoint").option("awsAccessKeyId", "accesskey").option("awsSecretKey","secretkey").start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)
对于 …
scala apache-kafka apache-spark amazon-kinesis kafka-consumer-api