我是 Kafka 新手,使用融合的 kafka 并尝试使用带有 'sasl.mechanism': 'PLAIN','security.protocol': 'SASL_SSL' 的 python 生产者代码从 AWS EC2 实例将消息写入现有的 kafka 主题。我尝试过此链接中的生产者示例。出现以下错误。如果有任何可以指导我什么会导致这个问题将会有很大的帮助。
Failed to deliver message: KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Run Code Online (Sandbox Code Playgroud) 我正在尝试从 Google Drive 下载特定文件夹。
我试过这个例子 http://www.mwclearning.com/?p=1608但它从 G-Drive 下载所有文件。
例如:如果我在 Google Drive 中有两个文件夹,请说..
如果我想下载文件夹 A 则只应下载 1 、 2 个文件。
任何建议或帮助都可能非常有帮助。
提前致谢。
我试图将回声值分配给一个变量,但我收到错误
Var='(echo $2 | sed -e 's/,/: chararray /g'| sed -e 's/$/: chararray/')'
echo $var
Input : sh load.sh file 1,2,3,4
Error load.sh: line 1: chararray: command not found
Run Code Online (Sandbox Code Playgroud) 如何将记录追加到现有的分区 Hive 表?例如,我有一个名为“ip_country”的现有外部表,数据集是 testdata1。如果数据集增长,比如我第二天的数据集是 testdata1 和 testdata2,那么如何将新数据即“testdata2”附加到“ip_country”配置单元表。
我正在尝试使用 regexp_replace 替换数据框列中的字符串。我必须将正则表达式模式应用于数据框列中的所有记录。但是字符串没有按预期替换。
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import sql
from pyspark.sql.functions import regexp_replace,col
import re
conf = SparkConf().setAppName("myFirstApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)
df=sc.parallelize([('2345','ADVANCED by John'),
('2398','ADVANCED by ADVANCE'),
('2328','Verified by somerandomtext'),
('3983','Double Checked by Marsha')]).toDF(['ID', "Notes"])
reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]
for i in range(len(reg_patterns)):
res_split=re.findall(r"[^/]+",reg_patterns[i])
res_split[0]
df=df.withColumn('NotesUPD',regexp_replace(col('Notes'),res_split[0],res_split[1]))
df.show()
Run Code Online (Sandbox Code Playgroud)
输出 :
+----+--------------------+--------------------+
| ID| Notes| NotesUPD|
+----+--------------------+--------------------+
|2345| ADVANCED by John| ADVANCED by John|
|2398| ADVANCED by ADVANCE| ADVANCED by ADVANCE|
|2328|Verified by somer...|Verified by …Run Code Online (Sandbox Code Playgroud)