如何使用 Pyspark 通过 SSH 隧道读取 MySQL 数据库?

Leo*_*e S 5 mysql apache-spark pyspark

我对 Pyspark 这种奇怪的行为感到困惑,但在互联网上找不到任何东西。

我们在专用网络中有一个 MySQL 集群,可以通过 SSH 隧道访问它。我正在尝试使用 Pyspark 和 SSHTunnelForwarder 从该数据库读取数据,我这样做是这样的:

  1. 创建隧道:
server = SSHTunnelForwarder(
        (target_tunnel_ip_address, 22),
        ssh_username=tunnel_username",
        ssh_private_key=private_key_filepath",
        remote_bind_address=(mysql_address, 3306)
        )

server.start()
Run Code Online (Sandbox Code Playgroud)
  1. 使用数据库信息创建 JDBC URL,如下所示:
hostname = "localhost" #Because I forwarded I forwarded the remote port to my localhost
port = server.local_bind_port #To access which port the SSHTunnelForwarder used
username = my_username
password = my_password
database = my_database
jdbcUrl = "jdbc:mysql://{}:{}/{}?user={}&password={}".format(hostname, port, database, username, password)
Run Code Online (Sandbox Code Playgroud)
  1. 从数据库中读取数据:
data = spark.read \
                  .format("jdbc") \
                  .option("url", jdbcUrl) \
                  .option("driver", "com.mysql.cj.jdbc.Driver") \
                  .option("query", query) \
                  .load()
Run Code Online (Sandbox Code Playgroud)

到目前为止一切顺利,这似乎有效,我可以看到表列:[变量数据的输出][1][1]:https://i.stack.imgur.com/YJhCC.png

DataFrame[id: int, company_id: int, life_id: int, type_id: int, cep: string, address: string, number: int, complement: string, neighborhood: string, city: string, state: string, origin: string, created_at: timestamp, updated_at: timestamp, active: boolean]
Run Code Online (Sandbox Code Playgroud)

但是,一旦我调用任何实际读取数据的方法,例如 .head()、.collect() 或其他变体,我就会收到此错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7629.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7629.0 (TID 11996, XX.XXX.XXX.XXX, executor 0): com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
Run Code Online (Sandbox Code Playgroud)

有谁知道为什么会发生这种情况以及如何解决它?

小智 1

该代码在驱动程序中执行,但任务在执行程序上运行,当您引用“localhost”时,每个执行程序将自行解释并无法连接。
相反,获取驱动程序的主机名(例如 socket.gethostname())并在 JDBC URL 中使用它