我开始使用Python.我已经添加了requirements.txt和setup.py我的项目.但是,我仍然对这两个文件的目的感到困惑.我读过这setup.py是为可再发行的东西而requirements.txt设计的,它是为不可再发行的东西而设计的.但我不确定这是否准确.
这两个文件真的打算如何使用?
我要在现有查询中添加行号,以便我可以跟踪已添加到Redis中的数据量.如果我的查询失败,那么我可以从该行开始,而不是在其他表中更新.
查询从表中获取1000行之后的数据
SELECT * FROM (SELECT *, ROW_NUMBER() OVER (Order by (select 1)) as rn ) as X where rn > 1000
Run Code Online (Sandbox Code Playgroud)
查询工作正常.如果有任何方法,我可以获得行没有不使用顺序.
这是什么select 1?
查询是否已优化,或者我可以通过其他方式进行查询.请提供更好的解决方案.
我正在研究Kafka Spark流媒体项目.Spark流式传输从Kafka获取数据.数据采用json格式.样本输入
{"table":"tableA","Product_ID":"AGSVGF.upf","file_timestamp":"2018-07-26T18:58:08.4485558Z000000000000000","hdfs_file_name":"null_1532631600050","Date_Time":"2018 -07-26T13:45:01.0000000Z","User_Name":"UBAHTSD"}
{"table":"tableB","Test_ID":"FAGS.upf","timestamp":"2018-07-26T18:58:08.4485558Z000000000000000","name":"flink","time":"2018 -07-26T13:45:01.0000000Z","Id":"UBAHTGADSGSCVDGHASD"}
一个JSON字符串是一条消息.有15种类型的JSON字符串区分使用表列.现在我想在Apache Hive中保存这15个不同的JSON.所以我创建了一个dstream,在表格列的基础上,我已经过滤了rdd并保存到Hive中.代码工作正常.但是有些时候很多时候会花很多时间来批量生产.我使用控制输入spark.streaming.kafka.maxRatePerPartition=10.我已将rdd重新划分为9分区,但在Spark UI上,它显示未知的阶段.

这是我的代码.
val dStream = dataStream.transform(rdd => rdd.repartition(9)).map(_._2)
dStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val sparkContext = rdd.sparkContext
rdd.persist(StorageLevel.MEMORY_AND_DISK)
val hiveContext = getInstance(sparkContext)
val tableA = rdd.filter(_.contains("tableA"))
if (!tableA.isEmpty()) {
HiveUtil.tableA(hiveContext.read.json(tableA))
tableA.unpersist(true)
}
val tableB = rdd.filter(_.contains("tableB"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableB))
tableB.unpersist(true)
}
.....
.... upto 15 tables
....
val tableK = rdd.filter(_.contains("tableK"))
if (!tableB.isEmpty()) {
HiveUtil.tableB(hiveContext.read.json(tableK))
tableB.unpersist(true)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我如何优化代码?
谢谢.
我有大量的yelp数据,我必须将评论分为8个不同的类别.
分类
Cleanliness
Customer Service
Parking
Billing
Food Pricing
Food Quality
Waiting time
Unspecified
Run Code Online (Sandbox Code Playgroud)
评论包含多个类别,因此我使用了多重分类.但我很困惑我如何处理积极/消极.实例审查可能对食品质量有利,但对客户服务有负面影响.前 - food taste was very good but staff behaviour was very bad. so review contains positive food quality but negative Customer service我该如何处理这个案子?我应该在分类前进行情绪分析吗?请帮我
classification machine-learning sentiment-analysis multilabel-classification multiclass-classification
我正在创建一个应用程序,其中获取进入 kafka 然后进入 spark 的流数据。消费数据,应用一些登录,然后将处理过的数据保存到配置单元中。数据速度非常快。我在 1 分钟内获得了 50K 条记录。火花流中有 1 分钟的窗口,它在其中处理数据并将数据保存在配置单元中。
我的问题是生产前瞻性架构好吗?如果是,我如何将流数据保存到配置单元中。我正在做的是,创建 1 分钟窗口数据的数据框,并使用
results.write.mode(org.apache.spark.sql.SaveMode.Append).insertInto("stocks")
Run Code Online (Sandbox Code Playgroud)
我还没有创建管道。可以吗,还是我必须修改架构?
谢谢
我正在使用 kafka-node link api 来创建 kafka 主题。我没有找到如何使用分区创建 kafka 主题。
var kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
// Create topics sync
producer.createTopics(['t','t1'], false, function (err, data) {
console.log(data);
});
// Create topics async
producer.createTopics(['t'], true, function (err, data) {});
producer.createTopics(['t'], function (err, data) {});// Simply omit 2nd arg
Run Code Online (Sandbox Code Playgroud)
如何在 nodejs 中创建带有分区的 kafka 主题。
我正在使用uhopper/hadoopdocker image来创建纱线簇.我有3个节点,每个节点有64GB RAM.我添加了配置.我已经给了纱线32GB.总簇内存为96GB.
- name: YARN_CONF_yarn_scheduler_minimum___allocation___mb
value: "2048"
- name: YARN_CONF_yarn_scheduler_maximum___allocation___mb
value: "16384"
- name: MAPRED_CONF_mapreduce_framework_name
value: "yarn"
- name: MAPRED_CONF_mapreduce_map_memory_mb
value: "8192"
- name: MAPRED_CONF_mapreduce_reduce_memory_mb
value: "8192"
- name: MAPRED_CONF_mapreduce_map_java_opts
value: "-Xmx8192m"
- name: MAPRED_CONF_mapreduce_reduce_java_opts
value: "-Xmx8192m"
- name: YARN_CONF_yarn_nodemanager_resource_memory___mb
value: "32768"
Run Code Online (Sandbox Code Playgroud)
最大应用程序主资源为10240 MB.我运行了5个火花作业,每个3 GB驱动程序内存,2个作业从未进入RUNNING状态,因为10240MB.我无法充分利用我的硬件.
如何增加Max Application Master Resources内存?
我正在使用 openid connect 实施身份验证。我已经在我的网站中成功实现了 openid - connect 。我正在使用众所周知的网址(https://accounts.google.com/.well-known/openid-configuration)来获取端点详细信息,因此方法应该对所有来源(Google、Microsoft 等)通用。
我想使用 okta 对用户进行身份验证。我没有找到 okta 的发现 url,以便我可以获取 okta 的端点详细信息。
谁能给我提供 okta 的知名网址吗?
我正在从事一项在 EMR 上运行的作业,它在 s3 上保存了数千个分区。分区为年/月/日。
我有过去 50 年的数据。现在,当 Spark 写入 10000 个分区时,使用连接大约需要 1 小时s3a。它非常慢。
df.repartition($"year", $"month", $"day").write.mode("append").partitionBy("year", "month", "day").parquet("s3a://mybucket/data")
Run Code Online (Sandbox Code Playgroud)
然后我尝试仅使用 s3 前缀,只花了几分钟就将所有分区保存在 S3 上。
df.repartition($"year", $"month", $"day").write.mode("append").partitionBy("year", "month", "day").parquet("s3://mybucket/data")
Run Code Online (Sandbox Code Playgroud)
当我覆盖 1000 个分区时,s3 与s3a
df
.repartition($"year", $"month", $"day")
.write
.option("partitionOverwriteMode", "dynamic")
.mode("overwrite").partitionBy("year", "month", "day")
.parquet("s3://mybucket/data")
Run Code Online (Sandbox Code Playgroud)
据我了解,s3a更加成熟,目前正在使用。s3/s3n 是旧的连接器,它们已被弃用。所以我想知道该用什么?我应该使用's3`吗?用于将数据保存到 s3 的 EMR 作业的最佳 s3 连接或 s3 URI 是什么?
amazon-s3 amazon-web-services amazon-emr apache-spark parquet
我已成功从Amazon S3读取csv文件.但我有情绪模型的.pkl文件.我想加载这个.pkl文件来预测情绪.这是我的代码 -
import cPickle
import boto3
import pandas as pd
import boto3.session
session = boto3.session.Session(region_name='eu-central-1')
s3client = session.client('s3', config= boto3.session.Config(signature_version='s3v4'),aws_access_key_id='my-ACCESS-KEY-ID',
aws_secret_access_key='my-ACCESS-KEY')
response = s3client.get_object(Bucket='sentiment-data', Key='positive_model_data.pkl')
nb_detector = cPickle.load(open(response['Body']))
nb_predict = nb_detector.predict('food is very good')[0]
print nb_predict
Run Code Online (Sandbox Code Playgroud)
强制转换为Unicode时出错:需要字符串或缓冲区,找到StreamingBody
如何从S3加载pickel文件???
apache-spark ×3
amazon-s3 ×2
apache-kafka ×2
hadoop ×2
python ×2
amazon-emr ×1
docker ×1
hadoop-yarn ×1
hive ×1
node.js ×1
oauth-2.0 ×1
okta ×1
optimization ×1
parquet ×1
pickle ×1
scala ×1
setup.py ×1
sql ×1
sql-server ×1
t-sql ×1