是否有一种优雅的方式来查询特定记录的Kafka主题?我正在构建的REST API获取一个ID,需要在Kafka主题中查找与该ID相关联的记录.一种方法是通过自定义消费者检查主题中的每条记录并查找匹配项,但我想避免读取大量记录的开销.Kafka是否具有快速,内置的过滤功能?
我想使用R的分布式计算tm包(称为tm.plugin.dc)创建一个包含1亿条推文的文本语料库.这些推文存储在我笔记本电脑上的一个大型MySQL表中.我的笔记本电脑很旧,所以我使用的是我在Amazon EC2上设置的Hadoop集群.
CRAN的tm.plugin.dc文档说目前只支持DirSource.文档似乎表明DirSource每个文件只允许一个文档.我需要语料库将每条推文视为文档.我有1亿条推文 - 这是否意味着我需要在旧笔记本上制作1亿个文件?这似乎过分了.有没有更好的办法?
到目前为止我尝试了什么:
将MySQL表的文件转储作为单个(大量).sql文件.将文件上传到S3.将文件从S3传输到群集.使用Cloudera的Sqoop工具将文件导入Hive.怎么办?我无法弄清楚如何使DirSource与Hive一起工作.
在我的笔记本电脑上发送每条推文的XML文件.但是怎么样?我的电脑很旧,无法做到这一点....如果我能够超越它,那么我会:将所有1亿个XML文件上传到亚马逊S3中的文件夹.将S3文件夹复制到Hadoop集群.将DirSource指向该文件夹.
我使用 Spark 执行加载到 Redshift 中的数据转换。Redshift 不支持 NaN 值,所以我需要用 NULL 替换所有出现的 NaN。
我试过这样的事情:
some_table = sql('SELECT * FROM some_table')
some_table = some_table.na.fill(None)
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
ValueError: value 应该是 float、int、long、string、bool 或 dict
所以它似乎na.fill()不支持无。我特别需要替换为NULL,而不是其他值,例如0.
如何充分利用我的每个EC2内核?
我正在使用c4.4xlarge AWS Ubuntu EC2实例和TensorFlow来构建一个大型复杂的神经网络.nproc说我的EC2实例有16个核心.当我运行我的convnet培训代码时,顶级实用程序说我只使用400%的CPU.由于有16个内核,我原以为它使用1600%的CPU.AWS EC2监控选项卡确认我只使用了25%的CPU容量.这是一个巨大的网络,在我的新Mac Pro上,它消耗大约600%的CPU并且需要几个小时来构建,所以我不认为原因是因为我的网络太小.
我相信下面这一行最终决定了CPU的使用情况:
sess = tf.InteractiveSession(config=tf.ConfigProto())
Run Code Online (Sandbox Code Playgroud)
我承认我并不完全理解线程和内核之间的关系,但我尝试增加内核数量.它与上面的行具有相同的效果:仍然是400%的CPU.
NUM_THREADS = 16
sess = tf.InteractiveSession(config=tf.ConfigProto(intra_op_parallelism_threads=NUM_THREADS))
Run Code Online (Sandbox Code Playgroud)
编辑:
我的训练数据集太大而无法容纳到内存中,因此我的代码一次只能从磁盘读取1,000条记录.现在我想使用Tensorflow的新数据集API.数据集API是否允许我指定要保留在内存中的记录数,或者Tensorflow是否自动管理内存以便我不必?
我是 JavaScript 新手,很难链接多个 Promise.all() 语句。下面是我的代码的高度简化版本。
function a(x) {
return new Promise(function(resolve) {
setTimeout(resolve(x*2), 500)
});
}
function b(x) {
return new Promise(function(resolve) {
setTimeout(resolve(x*3), 400)
});
}
function c(x) {
const promises = [a(x),b(x)];
Promise.all(promises).then(function(y){
z = y[0] + y[1]
return new Promise(function(resolve, reject){
resolve(z);
});
});
}
function d(x) {
const promises = [];
for (let input of x){
promises.push(c(input))
}
Promise.all(promises).then(function(z){
console.log(z);
});
}
const data = [1,2,3,4,5];
d(data);Run Code Online (Sandbox Code Playgroud)
我希望看到这个打印出来:
[5, 10, 15, 20, 25]
Run Code Online (Sandbox Code Playgroud)
但我看到的是这样的:
[undefined, undefined, …Run Code Online (Sandbox Code Playgroud) 我使用包show_prediction中的函数eli5来了解我的 XGBoost 分类器如何得出预测。由于某种原因,我似乎得到了模型的回归分数而不是概率。
下面是一个使用公共数据集的完全可重现的示例。
from sklearn.datasets import load_breast_cancer
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from eli5 import show_prediction
# Load dataset
data = load_breast_cancer()
# Organize our data
label_names = data['target_names']
labels = data['target']
feature_names = data['feature_names']
features = data['data']
# Split the data
train, test, train_labels, test_labels = train_test_split(
features,
labels,
test_size=0.33,
random_state=42
)
# Define the model
xgb_model = XGBClassifier(
n_jobs=16,
eval_metric='auc'
)
# Train the model
xgb_model.fit(
train,
train_labels
)
show_prediction(xgb_model.get_booster(), …Run Code Online (Sandbox Code Playgroud) 我下面的代码生成了一张图,其中x轴上的标签被压在一起并且难以阅读。我怎样才能每隔两年而不是全部显示?
import seaborn as sns
import pandas as pd
years = list(range(1996,2017))
count = [2554.9425,2246.3233,1343.7322,973.9502,706.9818,702.0039,725.9288,
598.7818,579.0219,485.8281,474.9578,358.1385,311.4344,226.3332,
161.4288,132.7368,78.1659,39.8121,23.1321,0.2232,0.0015]
df = pd.DataFrame({'year':years, 'count':count})
dot_size = 0.7
ax = sns.pointplot(x="year",y="count", scale = dot_size, data=df)
ax.set(xlabel='Year', ylabel='Amount')
ax.set(ylim=(0, 3000))
Run Code Online (Sandbox Code Playgroud)
在 OSX 10.12.6 上运行 Dask 0.16.0 时,我无法将本地连接dask-worker到本地dask-scheduler. 我只是想遵循官方Dask 教程。重现步骤:
第 1 步:运行dask-scheduler
第 2 步:运行dask-worker 10.160.39.103:8786
该问题似乎与 dask 调度程序有关,而不是与工作人员有关,因为我什至无法通过其他方式访问该端口(例如nc -zv 10.160.39.103 8786)。
然而,该进程显然仍在机器上运行:
想象一下,我有一个这样的表:
CREATE TABLE time_series (
snapshot_date DATE,
sales INTEGER,
PRIMARY KEY (snapshot_date));
Run Code Online (Sandbox Code Playgroud)
使用这样的值:
INSERT INTO time_series SELECT '2017-01-01'::DATE AS snapshot_date,10 AS sales;
INSERT INTO time_series SELECT '2017-01-02'::DATE AS snapshot_date,4 AS sales;
INSERT INTO time_series SELECT '2017-01-03'::DATE AS snapshot_date,13 AS sales;
INSERT INTO time_series SELECT '2017-01-04'::DATE AS snapshot_date,7 AS sales;
INSERT INTO time_series SELECT '2017-01-05'::DATE AS snapshot_date,15 AS sales;
INSERT INTO time_series SELECT '2017-01-06'::DATE AS snapshot_date,8 AS sales;
Run Code Online (Sandbox Code Playgroud)
我希望能够做到这一点:
SELECT a.snapshot_date,
AVG(b.sales) AS sales_avg,
COUNT(*) AS COUNT
FROM time_series AS a …Run Code Online (Sandbox Code Playgroud) python ×4
amazon-ec2 ×2
tensorflow ×2
apache-kafka ×1
dask ×1
hadoop ×1
hive ×1
javascript ×1
postgresql ×1
pyspark-sql ×1
r ×1
seaborn ×1
sql ×1
tm ×1
xgboost ×1