我正在研究 Kafka 流媒体并尝试将其与 Apache Spark 集成。但是,在运行时我遇到了问题。我收到以下错误。
这是我正在使用的命令。
df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()
错误:
Py4JJavaError:调用o77.load时出错。:java.lang.ClassNotFoundException:找不到数据源:kafka。请在http://spark.apache.org/third-party-projects.html找到软件包
我该如何解决这个问题?
注意:我在 Jupyter Notebook 中运行它
findspark.init('/home/karan/spark-2.1.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('KafkaStreaming').getOrCreate()
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
Run Code Online (Sandbox Code Playgroud)
一切都运行良好,直到这里(上面的代码)
df_TR = Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "taxirides").load()
这就是出错的地方(上面的代码)。
我正在关注的博客:https : //www.adaltas.com/en/2019/04/18/spark-streaming-data-pipelines-with-structured-streaming/
我试图获取过去的数据。假设今天在雇员表中,当我单击时,我可以看到1000名雇员。但这不是昨天的情况。今天有2名新员工加入,因此我的人数为1000。如果我想昨天休息一下,那么过去60天应获得998等。
我已经尝试了UNION ALL。但是,这将无济于事。
/* Create Table */
CREATE TABLE dbo.EMPTable (ID INT, H_Date DATE)
/* Load Data */
INSERT INTO dbo.EMPTable VALUES (1,'2019-04-17')
INSERT INTO dbo.EMPTable VALUES (2,'2019-04-17')
INSERT INTO dbo.EMPTable VALUES (3,'2019-04-16')
INSERT INTO dbo.EMPTable VALUES (4,'2019-04-16')
INSERT INTO dbo.EMPTable VALUES (5,'2019-04-15')
INSERT INTO dbo.EMPTable VALUES (6,'2019-04-15')
INSERT INTO dbo.EMPTable VALUES (7,'2019-04-15')
INSERT INTO dbo.EMPTable VALUES (8,'2019-04-14')
INSERT INTO dbo.EMPTable VALUES (9,'2019-04-14')
INSERT INTO dbo.EMPTable VALUES (10,'2019-04-14')
INSERT INTO dbo.EMPTable VALUES (11,'2019-04-14')
INSERT INTO dbo.EMPTable VALUES (12,'2019-04-14')
INSERT INTO …Run Code Online (Sandbox Code Playgroud)