小编sat*_*hya的帖子

在scala中用Future替换while循环

我有一个返回值为Future [Int]的函数

def func: Future[Int] = {...}
Run Code Online (Sandbox Code Playgroud)

我将定期检查func的返回值,直到它满足某些条件(例如返回值大于10),然后我将使用此返回值来创建map/flatmap的其他未来值.

如何在没有任何同步代码的情况下完成这项工作?如下所列:

def check: Future[Int] = {
    var ret: Int = Await.result(func, Duration.Inf)
    while (ret <= 10) {
        ret = Await.result(func, Duration.Inf)
    }
    Future(ret + 100)
}
Run Code Online (Sandbox Code Playgroud)

scala

7
推荐指数
1
解决办法
1011
查看次数

使用架构中的所有键(包括空列)将 spark 数据集写入 json

我正在使用以下方法将数据集写入 json:

ds.coalesce(1).write.format("json").option("nullValue",null).save("project/src/test/resources")
Run Code Online (Sandbox Code Playgroud)

对于包含空值列的记录,json 文档根本不写入该键。

有没有办法对 json 输出强制执行空值键?

这是必需的,因为我使用此 json 将其读取到另一个数据集(在测试用例中)并且如果某些文档没有案例类中的所有键,则无法强制执行模式(我通过将 json 文件放在资源下来读取它文件夹并通过 RDD[String] 转换为数据集,如下所述:https : //databaseline.bitbucket.io/a-quickie-on-reading-json-resource-files-in-apache-spark/

json scala apache-spark databricks

7
推荐指数
1
解决办法
6618
查看次数

同时从Rabbitmq接收日志并运行您的烧瓶应用程序

我安装了rabbitmq并正常工作,我知道如何接收日志,但不知道如何将其显示到带烧瓶的UI.

flask_app.py

from flask import Flask
from threading import Thread
app = Flask(__name__)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                     type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
               queue=queue_name)

print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

thread = Thread(channel.start_consuming())
thread.start()

@app.route('/')
def index():
    return 'hi'
Run Code Online (Sandbox Code Playgroud)

我不知道如何使用多线程来运行flask应用程序并不断从队列接收日志.

python multithreading rabbitmq flask rabbitmqctl

7
推荐指数
1
解决办法
297
查看次数

熊猫数据帧中的内部连接/合并比左数据帧提供更多的行

以下是数据框列的外观。

df1='device number', 'date', ....<<10 个其他列>> 3500 条记录

df2='device number', 'date', ....<<9 个其他列>> 14,000 条记录

在每个数据帧中,“设备编号”和“日期”都不是唯一的。但是,它们的组合对于标识行是唯一的。

我试图形成一个新的数据框,它匹配来自 df1 和 df2 的行,其中设备号和日期都相等,并且具有这些 df1 和 df2 中的所有列。我正在尝试的熊猫命令是

df3=pd.merge(df1, df2, how='inner', on=['device number', 'date'])
Run Code Online (Sandbox Code Playgroud)

但是,df3 给了我一个形状为 (14,000, 21) 的数据框。列号是有道理的,但是内连接的行如何比任何左侧数据帧都多?这是否意味着我对内连接的理解有缺陷?另外,我怎样才能达到我描述的结果?

python join inner-join dataframe pandas

6
推荐指数
1
解决办法
5323
查看次数

错误代码:JA018 在 HDInsight Spark2 群集中运行 Oozie 工作流

我正在 azure hdinsight Spark2 集群中安排具有以下结构的 oozie 作业。我使用以下命令安排了作业,

oozie job -config /job.properties -run
oozie job -config /coordinator.properties -run
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误

Status: ERROR
Error Code: JA018
Error Message: Main class [org.apache.oozie.action.hadoop.ShellMain], exit code
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

我的工作流程.xml 文件:

<workflow-app name="sparkshellwf" xmlns="uri:oozie:workflow:0.3">
  <start to="sparkshellwf"/>
  <action name="sparkshellwf">
    <shell xmlns="uri:oozie:shell-action:0.1">
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <configuration>
        <property>
          <name>mapred.job.queue.name</name>
          <value>${queueName}</value>
        </property>
      </configuration>
      <exec>$SPARK_HOME/bin/spark-submit</exec>
      <!--Adding all arguments needed/optional for Spark-submit here-->
      <argument>--class</argument>
      <argument>${Spark_Driver}</argument>
      <argument>--master</argument>
      <argument>${Spark_Master}</argument>
      <argument>--deploy-mode</argument>
      <argument>${Spark_Mode}</argument>
      <argument>--num-executors</argument>
      <argument>${numExecutors}</argument>
      <argument>--driver-memory</argument>
      <argument>${driverMemory}</argument>
      <argument>--executor-memory</argument>
      <argument>${executorMemory}</argument>
      <argument>--executor-cores</argument>
      <argument>${executorCores}</argument>
      <argument>${workflowRoot}/lib/${sparkJar}</argument>
    </shell>
    <ok to="end"/>
    <error to="fail"/>
  </action>
  <kill name="fail"> …
Run Code Online (Sandbox Code Playgroud)

azure oozie apache-spark oozie-coordinator azure-hdinsight

5
推荐指数
1
解决办法
6165
查看次数

在Java Spark中将RDD转换为Dataset

我有一个RDD,我需要将其转换为数据集,我试过:

Dataset<Person> personDS =  sqlContext.createDataset(personRDD, Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud)

以上行抛出错误,

无法解析方法createDataset(org.apache.spark.api.java.JavaRDD Main.Person,org.apache.spark.sql.Encoder T)

但是,我可以转换为Dataset转换为Dataframe.以下代码有效:

Dataset<Row> personDF = sqlContext.createDataFrame(personRDD, Person.class);
Dataset<Person> personDS = personDF.as(Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud)

java apache-spark

5
推荐指数
1
解决办法
1万
查看次数

如何使用Dataset API编写字数统计?

我需要单独使用 Spark 数据集编写字数统计逻辑。

我使用JavaRDDSpark 类实现了相同的过程,但我想通过使用Dataset<Row>Spark SQL 类来完成相同的过程。

如何在 Spark SQL 中进行字数统计?

java apache-spark apache-spark-sql

3
推荐指数
1
解决办法
1379
查看次数

分组并在spark sql中获取第一个值

我在spark sql中按行动进行分组.在某些行中包含具有不同ID的相同值.在这种情况下,我想选择第一行.

这是我的代码.

    val highvalueresult = highvalue.select($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID",  $"RSSI_Weight_avg")
                          .groupBy("tagShortID", "Timestamp").agg(max($"RSSI_Weight_avg")
                          .alias("RSSI_Weight_avg"))

        val t2 = averageDF.join(highvalueresult, Seq("tagShortID", "Timestamp", "RSSI_Weight_avg"))
Run Code Online (Sandbox Code Playgroud)

这是我的结果.

tag,timestamp,rssi,listner,rootorg,suborg
2,1496745906,0.7,3878,4,3
4,1496745907,0.6,362,4,3
4,1496745907,0.6,718,4,3
4,1496745907,0.6,1901,4,3
Run Code Online (Sandbox Code Playgroud)

在上面的结果为时间戳1496745907三个listner相同的rssi值.在这种情况下,我想选择第一行.

scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3180
查看次数

探索 Azure 云本身内的 Azure 表存储中的数据

我已在 Azure 表存储中创建了数据,但如果不登录 Azure 存储资源管理器,则无法查看表存储中加载的数据。

我想从 Azure 云门户 ( ) 查看 Azure 表存储中加载的数据portal.azure.com。是否有可能做到这一点?

azure azure-table-storage

3
推荐指数
1
解决办法
5876
查看次数

java.lang.String 不是创建 Spark 数据帧时 int 错误模式的有效外部类型

我只是尝试用 Spark 制作数据框。我只是尝试编写如下代码。

首先,我导入如下

import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types.IntegerType`
Run Code Online (Sandbox Code Playgroud)

然后,我尝试为数据框创建行和架构,如下所示。

val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))

val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))

val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
Run Code Online (Sandbox Code Playgroud)

最后,我尝试查看数据框是否可以使用

EmpDF.show
Run Code Online (Sandbox Code Playgroud)

我收到如下错误

    ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
    java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
    java.lang.String is not a valid external type for schema of int
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class …
Run Code Online (Sandbox Code Playgroud)

scala runtime-error apache-spark

3
推荐指数
1
解决办法
2万
查看次数

list.map可以在scala中使用字符串

是str以某种方式隐式转换为str.charAt,所以map函数有效吗?

val str = "abcdefghij"
println(List.range(0, 10).map(str));
Run Code Online (Sandbox Code Playgroud)

collections scala

1
推荐指数
1
解决办法
60
查看次数