小编pol*_*olo的帖子

如何针对另一个数据帧过滤一个spark数据帧

我正在尝试将一个数据帧与另一个数据帧进行过滤:

scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a", a*0.123, a))).toDF("name", "score", "user_id")
scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id")
Run Code Online (Sandbox Code Playgroud)

现在我想过滤df1并返回一个包含df1中所有行的数据帧,其中user_id在df2("valid_id")中.换句话说,我想要df1中的所有行,其中user_id是2,3,4,5或6

scala> df1.select("user_id").filter($"user_id" in df2("valid_id"))
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20);  
Run Code Online (Sandbox Code Playgroud)

另一方面,当我尝试对函数进行过滤时,一切看起来都很棒:

scala> df1.select("user_id").filter(($"user_id" % 2) === 0)
res1: org.apache.spark.sql.DataFrame = [user_id: int]
Run Code Online (Sandbox Code Playgroud)

为什么我收到此错误?我的语法有问题吗?

以下评论我试图做左外连接:

scala> df1.show
+-------+------------------+-------+
|   name|             score|user_id|
+-------+------------------+-------+
| user 1|             0.123|      1|
| user 2|             0.246|      2|
| user …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe

13
推荐指数
1
解决办法
2万
查看次数

将spark数据框中的多行合并为一行

我有一个包含2列的数据框:timestamp,value timestamp是自纪元以来的时间,value是浮点值.我希望将行合并为min的平均值.这意味着我想要获取时间戳来自同一轮分钟(自纪元以来间隔60秒)的所有行,并将它们合并为一行,其中值列将是所有值的平均值.

举个例子,假设我的数据框看起来像这样:

timestamp      value
---------      -----
1441637160      10.0
1441637170      20.0
1441637180      30.0
1441637210      40.0
1441637220      10.0
1441637230      0.0
Run Code Online (Sandbox Code Playgroud)

前4行是同一分钟的一部分(1441637160%60 == 0,1441637160 + 60 == 1441637220)最后2行是另一分钟的一部分.我想合并同一分钟的所有行.得到一个看起来像的结果:

timestamp      value
---------      -----
1441637160      25.0  (since (10+20+30+40)/4 = 25)
1441637220      5.0   (since (10+0)/2 = 5)
Run Code Online (Sandbox Code Playgroud)

最好的方法是什么?

dataframe apache-spark rdd apache-spark-sql

10
推荐指数
1
解决办法
9752
查看次数

在运行时更改代码

我有一个指向函数的指针(我从vtable获得),我想通过在运行时更改汇编代码(更改几个字节)来编辑函数.我尝试使用memset并尝试直接分配新值(类似于mPtr [0] = X,mPtr [1] = Y等)但我不断得到分段错误.我该如何更改代码?

(我正在使用C++)

操作系统是Windows.

c++ assembly runtime

8
推荐指数
1
解决办法
3244
查看次数

将未知大小的文件上传到 S3 的最佳策略

我有一个服务器端应用程序,它运行大量图像 URL 并将图像从这些 URL 上传到 S3。这些文件通过 HTTP 提供。我使用InputStreamI get from an HttpURLConnectionusinggetInputStream方法下载它们。我将 InputStream 交给 AWS S3 客户端putObject方法 (AWS Java SDK v1) 以将流上传到 S3。到现在为止还挺好。

我正在尝试引入一个新的外部图像数据源。此数据源的问题在于提供这些图像的 HTTP 服务器不返回Content-LengthHTTP 标头。这意味着我无法确定图像有多少字节,这是 AWS S3 客户端验证图像是否正确从流上传到 S3 所需的数字。

我能想到的处理这个问题的唯一方法是让服务器所有者将Content-LengthHTTP 标头添加到他们的响应中(不太可能),或者先将文件下载到内存缓冲区,然后从那里将其上传到 S3。

这些不是大文件,但我有很多。

在考虑先下载文件时,我担心内存占用和并发影响(无法同时上传和下载同一文件的块)。

由于我正在处理许多小文件,我怀疑如果我专注于多个文件而不是单个文件的并发性,并发性问题可能会“解决”。因此,我将使用我的 IO 有效地下载一个文件,同时上传另一个文件,而不是同时下载和上传同一文件的块。

我会喜欢你关于如何做到这一点的想法、最佳实践、陷阱或关于如何最好地解决这个问题的任何其他想法。

java io concurrency scala amazon-s3

7
推荐指数
0
解决办法
1314
查看次数

要使用rabbitmq队列,我真的需要声明交换器和队列吗?

在我在网上找到的所有示例中,我看到交换和队列在消息被消耗之前被声明。宣布交换似乎很奇怪,因为我为什么要这样做?我正在使用一个队列,它可能绑定到多个交换器(或者没有绑定到任何交换器,也许它只是有旧消息在等待)。

另外,我想不出为什么要声明一个队列。这将要求我了解有关队列的信息,而我不需要知道这些信息来使用它(例如 auto_delete 和持久性)。

当我在本地测试它时,我可以在不声明任何内容的情况下使用队列。有用。所以我想知道,为什么我在网上看到的每个示例都声明交换和队列,即使它只是消耗它?

谢谢!!!

amqp rabbitmq rabbitmq-exchange

6
推荐指数
2
解决办法
2508
查看次数

Akka TCP客户端:如何使用akka actor通过TCP发送消息

我想通过TCP发送文本消息.满容易.我想用akka这样做.我读了这篇关于akka IO的文章:http://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html

本文介绍了TCP客户端的简单实现,但我不清楚如何使用此客户端.

  1. 构造函数采用InetSocketAddress和ActorRef.InetSocketAddress是有道理的(我假设这是目的地),但是ActorRef是什么?这是我第一次使用akka,但据我所知,ActorRef是另一个actor的引用.由于我的TCP客户端是一个actor,我希望这个TCP actor与TCP服务器通信,而不是与另一个actor通信,为什么我会给它一个actor ref?

  2. 伴侣对象中的道具功能是什么?

  3. 一旦实例化,我将如何使用此actor发送TCP消息?我应该以ByteString的形式向我发送一条包含我要发送的数据的消息吗?

4.什么是连接/区别

case Received(data) => 
    listener ! data
Run Code Online (Sandbox Code Playgroud)

case data: ByteString =>
    connection ! Write(data)
Run Code Online (Sandbox Code Playgroud)

sockets tcp scala akka akka-io

5
推荐指数
1
解决办法
2881
查看次数

在 python attrs 类中,如何用我自己的重写生成的 __init__

所以我喜欢使用 attr 但有时我需要做我自己的事情。我可以__init__用自己的方法覆盖该方法吗?

import attr
@attr.s(auto_attribs=True)
class MyClass:
     i: int
     def __init__(self, i, special=None):
          if special:
               self.i = special
          else:
               self.i = i
>>> a = MyClass(i=1,special=2)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
    a = MyClass(i=1,special=2)
TypeError: __init__() got an unexpected keyword argument 'special'
Run Code Online (Sandbox Code Playgroud)

另一个例子:

@attr.s(auto_attribs=True)
class MyOtherClass:
     i: int
     def __init__(self, i, **kwargs):
         self.i = kwargs.get('magic',i)



>>> a = MyOtherClass(i=5,magic=12)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
    a …
Run Code Online (Sandbox Code Playgroud)

python python-3.7 python-attrs

5
推荐指数
1
解决办法
5866
查看次数

如何将erlang dbg附加到正在运行的进程?

如何将调试器附加到正在运行的erlang进程(rabbitmq)?我有正在运行的相同兔子版本的源代码.我想在源代码行上设置断点,并将调试器附加到正在运行的Rabbit实例.我不确定erlang是否需要调试符号async_dirty.

在一个完美的世界里,我希望能够在本地和远程做到这一点.

我是一个二郎初学者,我甚至不会说我是二郎的新手.我正在尝试学习它,因为我调试了一些rabbitmq插件.

debugging erlang rabbitmq rabbitmq-exchange

2
推荐指数
1
解决办法
1388
查看次数

通过多个连接同时从FTP服务器下载同一文件的不同部分

我想从FTP服务器下载很大的文件。是否可以从特定的偏移量开始下载?我的用例是:1.传输期间连接断开,我可以继续下载而不是重新开始吗?2.是否可以使用具有不同偏移量的多个连接来同时使用多个连接来更快地下载文件?

我在scala中工作,因此理想情况下是在寻找提供此功能的JVM库。我找不到任何提供此功能的工具,甚至找不到支持偏移等FTP协议的证据。

java ftp scala protocols

2
推荐指数
1
解决办法
121
查看次数

Python简单语句的语法是什么?

我阅读了这篇文档:http: //docs.python.org/reference/simple_stmts.html

现在,我想创建它描述的语句.例如,一个连接多个断言和打印语句的语句.语法不清楚.我该如何使用::=运营商?

我将很感激一个明确的例子.

python

0
推荐指数
1
解决办法
737
查看次数