小编Mar*_*aci的帖子

过滤DataFrame最有效的方法是什么

...通过检查列的值是否在a中seq.
也许我没有解释得很好,我基本上希望这(使用常规的SQL表达出来)DF_Column IN seq

首先,我使用a broadcast var(我放置seq),UDF(检查完成)和registerTempTable.
问题是,我没有测试它,因为我遇到了一个已知的bug,显然只有使用时,会出现registerTempTableScalaIDE.

我最终创建了一个新DataFrameseq并与它进行内连接(交集),但我怀疑这是完成任务的最高效的方式.

谢谢

编辑:(回应@YijieShen):
如何filter根据一个DataFrame列的元素是否在另一个DF的列(如SQL select * from A where login in (select username from B))中?

例如:第一个DF:

login      count
login1     192  
login2     146  
login3     72   
Run Code Online (Sandbox Code Playgroud)

第二个DF:

username
login2
login3
login4
Run Code Online (Sandbox Code Playgroud)

结果:

login      count
login2     146  
login3     72   
Run Code Online (Sandbox Code Playgroud)

尝试:
EDIT-2:我认为,现在修复了这个bug,这些应该可行.结束编辑-2

ordered.select("login").filter($"login".contains(empLogins("username")))
Run Code Online (Sandbox Code Playgroud)

ordered.select("login").filter($"login" in empLogins("username"))
Run Code Online (Sandbox Code Playgroud)

两者Exception …

apache-spark apache-spark-sql

31
推荐指数
2
解决办法
3万
查看次数

如何提高BigQuery中GeoIP查询的性能?

我已经在BigQuery中加载了我的应用程序日志,我需要根据这些日志中的IP地址计算国家/地区.

我在我的表和从MaxMind下载的GeoIP映射表之间编写了一个连接查询.

理想的查询将OUTER JOIN使用范围过滤器,但BQ=在连接条件下支持.因此查询执行INNER JOIN并处理每一侧的缺失值JOIN.

我修改了原始查询,因此可以在维基百科公共数据集上运行.

有人可以帮助我让这个跑得更快吗?

SELECT id, client_ip, client_ip_code, B.Country_Name as Country_Name

FROM
    (SELECT id, contributor_ip as client_ip, INTEGER(PARSE_IP(contributor_ip)) AS client_ip_code, 1 AS One
    FROM [publicdata:samples.wikipedia] Limit 1000) AS A1

JOIN 
    (SELECT From_IP_Code, To_IP_Code, Country_Name, 1 AS One
    FROM

        -- 3 IP sets: 1.valid ranges, 2.Gaps, 3. Gap at the end of the set
        -- all Ranges of valid IPs:
        (SELECT From_IP_Code, To_IP_Code, Country_Name FROM [QA_DATASET.GeoIP]) …
Run Code Online (Sandbox Code Playgroud)

performance geoip google-bigquery

11
推荐指数
2
解决办法
3931
查看次数

在Kafka中解释复制偏移检查点和恢复点偏移

有些人可以解释这些文件的含义,存在于kafka代理日志中.

root @ a2md23297l:/ tmp/kafka-logs-1#cat recovery-point-offset-checkpoint
0
5
my-topic 0 0
kafkatopic_R2P1_1 0 0
my-topic 1 0
kafkatopic_R2P1 0 0
test 0 0
root @ a2md23297l:/ tmp/kafka -logs-1#cat replication-offset-checkpoint
0
5
my-topic 0 0
kafkatopic_R2P1_1 0 2
my-topic 1 0
kafkatopic_R2P1 0 2
test 0 57

Fyi,my-topic,kafkatopic_R2P1_1,my-topic,kafkatopic_R2P1,test是创建的主题.提前致谢.

apache-kafka kafka-consumer-api kafka-producer-api

4
推荐指数
1
解决办法
4931
查看次数

solr DIH中的delta-import问题

当我运行full_import命令时它工作正常.更新的日期将写入dataimport.properties文件.当我运行delta-import时,它会使Indexing失败.回滚所有更改.. dataimport.properties文件已创建,并在solr/conf/location中具有777权限.

请帮我解决这个问题.


@mbonaci

data-config.xml中的查询,deltaImportQuery和deltaQuery如下所示

    <entity name="item" query="select group_title,description,DATE_FORMAT(created_date, '%Y-%m-%dT%H:%i:%sZ') as createdDate,group_status, 'GROUP' as itemtype,group_id as id from collaboration_groups where group_status=1" 
deltaImportQuery="select group_title,description,DATE_FORMAT(created_date, '%Y-%m-%dT%H:%i:%sZ') as createdDate,group_status,group_id as id,'GROUP' as itemtype  from collaboration_groups where group_status=1 and group_id=${dataimporter.delta.id} "  
deltaQuery="select group_id from collaboration_groups where group_status=1 and  updated_date &gt; '${dataimporter.last_index_time}'" deletedPkQuery="select group_id  from  collaboration_groups  where group_status = 0 and updated_date &gt; '${dataimporter.last_index_time}'">
            <field column="id"  name="id" />
            <field column="itemtype" name="itemtype" />
            <field column="group_title" name="fullName" />
            <field column="description" name="description"/>
            <field column="createdDate" name="createdDate"/>
</entity>
Run Code Online (Sandbox Code Playgroud)

config.xml中dataImport请求处理程序的config参数如下

<requestHandler …
Run Code Online (Sandbox Code Playgroud)

solr dataimporthandler

1
推荐指数
1
解决办法
4675
查看次数