小编voi*_*oid的帖子

DynamoDB:SET list_append使用aws sdk无法正常工作

我需要使用相应的键将字符串附加到dynamodb表中的字符串集.这是我用来执行updateItem的Update表达式:

  var params = {
    "TableName" : tableName,
    "Key": {
      "ID": {
        S: "20000"
      }
    },
    "UpdateExpression" : "SET #attrName = list_append(#attrName, :attrValue)",
    "ExpressionAttributeNames" : {
      "#attrName" : "entries"
    },
    "ExpressionAttributeValues" : {
      ":attrValue" : {"SS":["000989"]}
    }   };
Run Code Online (Sandbox Code Playgroud)

这在我使用aws cli执行updateItem()时有效.但是当在nodejs中使用aws-sdk时,我收到错误:

Invalid UpdateExpression: Incorrect operand type for operator or function; operator or function: list_append, operand type: M\n
Run Code Online (Sandbox Code Playgroud)

有帮助吗?谢谢

amazon-web-services node.js amazon-dynamodb aws-sdk

19
推荐指数
2
解决办法
1万
查看次数

Spark Dataframes UPSERT到Postgres表

我正在使用Apache Spark DataFrames连接两个数据源并将结果作为另一个DataFrame获取.我想将结果写入另一个Postgres表.我看到这个选项:

myDataFrame.write.jdbc(url, table, connectionProperties)
Run Code Online (Sandbox Code Playgroud)

但是,我想要做的是根据表的主键将数据帧放入表中.怎么做?我正在使用Spark 1.6.0.

postgresql scala dataframe apache-spark apache-spark-sql

14
推荐指数
3
解决办法
2万
查看次数

使用MongoDB Java 3.0驱动程序批量Upsert

在早期版本的MongoDB Java驱动程序中,要运行查询并对结果执行无序批量upsert,我们所做的就是:

BulkWriteOperation bulk = dbCollection.initializeUnorderedBulkOperation();
    bulk.find(searchQuery).upsert().update(new BasicDBObject("$set", getDbObjectModel()));
Run Code Online (Sandbox Code Playgroud)

但是在版本3中,随着Bson Document支持和MongoCollection.bulkWrite()方法的引入,如何才能做到这一点?

我试过这个:

List<WriteModel<Document>> documentList = new ArrayList<>();

collection.bulkWrite(documentList, new BulkWriteOptions().ordered(false));
Run Code Online (Sandbox Code Playgroud)

但是,我需要upsert功能.

谢谢.

java upsert mongodb mongo-java-driver

11
推荐指数
2
解决办法
1万
查看次数

比较内存中的集群计算系统

我正在研究Spark(伯克利)集群计算系统.在我的研究中,我了解了其他一些内存系统,如Redis,Memcachedb等.如果有人能给我SPARK和REDIS(以及MEMCACHEDB)之间的比较,那将会很棒.在什么情况下Spark比其他内存系统有优势?

memcachedb redis apache-spark apache-storm

10
推荐指数
1
解决办法
5367
查看次数

在被Driver停止后,Spark流式传输作业失败

我有一个火花流工作,从Kafka读取数据并对其进行一些操作.我正在通过一个纱线集群Spark 1.4.1运行这个工作,它有两个节点,每个节点有16 GB RAM,每个节点有16个核心.

我把这些conf传递给了spark-submit工作:

--master yarn-cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 3

作业返回此错误并在运行一段时间后结束:

INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11,
(reason: Max number of executor failures reached)

.....

ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0:
Stopped by driver
Run Code Online (Sandbox Code Playgroud)

更新 :

这些日志也被发现:

INFO yarn.YarnAllocator: Received 3 containers from YARN, launching executors on 3 of them.....

INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down.

....

INFO yarn.YarnAllocator: Received 2 containers from YARN, launching executors on 2 of them. …
Run Code Online (Sandbox Code Playgroud)

apache-kafka hadoop-yarn apache-spark spark-streaming

8
推荐指数
1
解决办法
2539
查看次数

Spark:如何使用mapPartition并为每个分区创建/关闭连接

所以,我想对我的spark DataFrame进行某些操作,将它们写入DB并在最后创建另一个DataFrame.它看起来像这样:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()
Run Code Online (Sandbox Code Playgroud)

这给了我一个错误,因为mapPartitions期望返回类型Iterator[NotInferedR],但在这里Unit.我知道这可以用forEachPartition,但我也想做映射.分开进行将是一个开销(额外的火花工作).该怎么办?

谢谢!

scala apache-spark rdd

8
推荐指数
2
解决办法
6550
查看次数

地图和udf之间的区别

当我在Spark中使用DataFrame时,我有时只需要编辑该DataFrame中特定列的值.例如.如果count我的数据框中有一个字段,如果我想添加1每个值count,那么我可以编写一个自定义withColumn的udf 来使用DataFrames 的功能完成工作,或者我可以map在DataFrame上做一个然后从结果RDD中提取另一个DataFrame.

我想知道的是udf实际上是如何工作的.在这种情况下,使用map/udf给我一个比较.性能差异是什么?

谢谢!

scala apache-spark udf

8
推荐指数
1
解决办法
2456
查看次数

MongoDb BSON以UTC时间存储日期

如果我尝试在Document(BSON)中放置一个日期字段并将其写入Mongo,BSON会将其写入UTC.例如,约会

DateTime dateTime = new DateTime("2015-07-01");
Document doc = new Document("date", dateTime.toDate());
Run Code Online (Sandbox Code Playgroud)

将被存储为

"date" : ISODate("2015-06-30T18:30:00Z")
Run Code Online (Sandbox Code Playgroud)

在蒙戈.而且,如果我使用相同的Java驱动程序检索它,我会得到它

Wed Jul 01 00:00:00 IST 2015
Run Code Online (Sandbox Code Playgroud)

大.这个没有解决方案吗?我的意思是,为什么我不能按照自己的意愿存储日期?如果我需要从另一个时区查询数据库怎么办?我会得到不同的结果?日期字段是Mongo的重要组成部分,其中包含丰富的运算符.不过,为什么Mongo不提供这种灵活性?谢谢

java date mongodb mongo-java-driver

5
推荐指数
2
解决办法
9893
查看次数

Spark ml余弦相似度:如何获得1到n相似度分数

我读到我可以使用columnSimilarities随附的方法RowMatrix来查找各种记录(基于内容)的余弦相似度。我的数据如下所示:

genre,actor
horror,mohanlal shobhana pranav 
comedy,mammooty suraj dulquer
romance,fahad dileep manju
comedy,prithviraj
Run Code Online (Sandbox Code Playgroud)

现在,我创建了一个spark-ml管道来计算上述文本特征(体裁,演员)的tf-idf,并VectorAssembler在管道中使用来将这两个特征组合成一个单独的列“特征”。之后,我将获得的结果转换为DataFrame

val vectorRdd = finalDF.map(row => row.getAs[Vector]("features"))
Run Code Online (Sandbox Code Playgroud)

转换成 RDD[Vector]

然后,我RowMatrix通过

val matrix = new RowMatrix(vectorRdd)
Run Code Online (Sandbox Code Playgroud)

我下面这个指南,以余弦相似,我需要什么参考火花mllib的方法来找到特定的记录和所有其他人等之间的相似性在sklearn方法,如图所示,指南:

cosine_similarity(tfidf_matrix[0:1], tfidf_matrix)
Run Code Online (Sandbox Code Playgroud)

但是,我找不到如何执行此操作。我不明白matrix.columnSimilarities()比较和返回的内容。有人可以帮我找我的东西吗?

任何帮助表示赞赏!谢谢。

scala cosine-similarity apache-spark apache-spark-ml apache-spark-mllib

5
推荐指数
1
解决办法
1559
查看次数

MongoDB:如何获取集合中最新文档的最新时间戳

是否有一个简单的或优雅的方法(或我可以写的查询)来检索集合中(最后更新的文档的)最后更新的时间戳。我可以编写这样的查询来查找最后插入的文档

db.collection.find().limit(1).sort({$natural:-1})
Run Code Online (Sandbox Code Playgroud)

但我需要有关最新更新文档的信息(可以是插入内容或更新内容)。

我知道一种方法是从集合中查询oplog集合中的最后一条记录。但是考虑到oplog的大小可能非常大(似乎也不可靠,因为它是一个受限制的集合),这似乎是一项昂贵的操作。有一个更好的方法吗?

谢谢!

mongodb mongodb-query mongodb-oplog

5
推荐指数
3
解决办法
1万
查看次数