我有一个返回值为Future [Int]的函数
def func: Future[Int] = {...}
Run Code Online (Sandbox Code Playgroud)
我将定期检查func的返回值,直到它满足某些条件(例如返回值大于10),然后我将使用此返回值来创建map/flatmap的其他未来值.
如何在没有任何同步代码的情况下完成这项工作?如下所列:
def check: Future[Int] = {
var ret: Int = Await.result(func, Duration.Inf)
while (ret <= 10) {
ret = Await.result(func, Duration.Inf)
}
Future(ret + 100)
}
Run Code Online (Sandbox Code Playgroud) 我正在使用以下方法将数据集写入 json:
ds.coalesce(1).write.format("json").option("nullValue",null).save("project/src/test/resources")
Run Code Online (Sandbox Code Playgroud)
对于包含空值列的记录,json 文档根本不写入该键。
有没有办法对 json 输出强制执行空值键?
这是必需的,因为我使用此 json 将其读取到另一个数据集(在测试用例中)并且如果某些文档没有案例类中的所有键,则无法强制执行模式(我通过将 json 文件放在资源下来读取它文件夹并通过 RDD[String] 转换为数据集,如下所述:https : //databaseline.bitbucket.io/a-quickie-on-reading-json-resource-files-in-apache-spark/)
我安装了rabbitmq并正常工作,我知道如何接收日志,但不知道如何将其显示到带烧瓶的UI.
flask_app.py
from flask import Flask
from threading import Thread
app = Flask(__name__)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
thread = Thread(channel.start_consuming())
thread.start()
@app.route('/')
def index():
return 'hi'
Run Code Online (Sandbox Code Playgroud)
我不知道如何使用多线程来运行flask应用程序并不断从队列接收日志.
以下是数据框列的外观。
df1='device number', 'date', ....<<10 个其他列>> 3500 条记录
df2='device number', 'date', ....<<9 个其他列>> 14,000 条记录
在每个数据帧中,“设备编号”和“日期”都不是唯一的。但是,它们的组合对于标识行是唯一的。
我试图形成一个新的数据框,它匹配来自 df1 和 df2 的行,其中设备号和日期都相等,并且具有这些 df1 和 df2 中的所有列。我正在尝试的熊猫命令是
df3=pd.merge(df1, df2, how='inner', on=['device number', 'date'])
Run Code Online (Sandbox Code Playgroud)
但是,df3 给了我一个形状为 (14,000, 21) 的数据框。列号是有道理的,但是内连接的行如何比任何左侧数据帧都多?这是否意味着我对内连接的理解有缺陷?另外,我怎样才能达到我描述的结果?
我正在 azure hdinsight Spark2 集群中安排具有以下结构的 oozie 作业。我使用以下命令安排了作业,
oozie job -config /job.properties -run
oozie job -config /coordinator.properties -run
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误
Status: ERROR
Error Code: JA018
Error Message: Main class [org.apache.oozie.action.hadoop.ShellMain], exit code
Run Code Online (Sandbox Code Playgroud)
我的工作流程.xml 文件:
<workflow-app name="sparkshellwf" xmlns="uri:oozie:workflow:0.3">
<start to="sparkshellwf"/>
<action name="sparkshellwf">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>$SPARK_HOME/bin/spark-submit</exec>
<!--Adding all arguments needed/optional for Spark-submit here-->
<argument>--class</argument>
<argument>${Spark_Driver}</argument>
<argument>--master</argument>
<argument>${Spark_Master}</argument>
<argument>--deploy-mode</argument>
<argument>${Spark_Mode}</argument>
<argument>--num-executors</argument>
<argument>${numExecutors}</argument>
<argument>--driver-memory</argument>
<argument>${driverMemory}</argument>
<argument>--executor-memory</argument>
<argument>${executorMemory}</argument>
<argument>--executor-cores</argument>
<argument>${executorCores}</argument>
<argument>${workflowRoot}/lib/${sparkJar}</argument>
</shell>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail"> …Run Code Online (Sandbox Code Playgroud) 我有一个RDD,我需要将其转换为数据集,我试过:
Dataset<Person> personDS = sqlContext.createDataset(personRDD, Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud)
以上行抛出错误,
无法解析方法createDataset(org.apache.spark.api.java.JavaRDD Main.Person,org.apache.spark.sql.Encoder T)
但是,我可以转换为Dataset转换为Dataframe.以下代码有效:
Dataset<Row> personDF = sqlContext.createDataFrame(personRDD, Person.class);
Dataset<Person> personDS = personDF.as(Encoders.bean(Person.class));
Run Code Online (Sandbox Code Playgroud) 我需要单独使用 Spark 数据集编写字数统计逻辑。
我使用JavaRDDSpark 类实现了相同的过程,但我想通过使用Dataset<Row>Spark SQL 类来完成相同的过程。
如何在 Spark SQL 中进行字数统计?
我在spark sql中按行动进行分组.在某些行中包含具有不同ID的相同值.在这种情况下,我想选择第一行.
这是我的代码.
val highvalueresult = highvalue.select($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID", $"RSSI_Weight_avg")
.groupBy("tagShortID", "Timestamp").agg(max($"RSSI_Weight_avg")
.alias("RSSI_Weight_avg"))
val t2 = averageDF.join(highvalueresult, Seq("tagShortID", "Timestamp", "RSSI_Weight_avg"))
Run Code Online (Sandbox Code Playgroud)
这是我的结果.
tag,timestamp,rssi,listner,rootorg,suborg
2,1496745906,0.7,3878,4,3
4,1496745907,0.6,362,4,3
4,1496745907,0.6,718,4,3
4,1496745907,0.6,1901,4,3
Run Code Online (Sandbox Code Playgroud)
在上面的结果为时间戳1496745907三个listner相同的rssi值.在这种情况下,我想选择第一行.
我已在 Azure 表存储中创建了数据,但如果不登录 Azure 存储资源管理器,则无法查看表存储中加载的数据。
我想从 Azure 云门户 ( ) 查看 Azure 表存储中加载的数据portal.azure.com。是否有可能做到这一点?
我只是尝试用 Spark 制作数据框。我只是尝试编写如下代码。
首先,我导入如下
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types.IntegerType`
Run Code Online (Sandbox Code Playgroud)
然后,我尝试为数据框创建行和架构,如下所示。
val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))
val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))
val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
Run Code Online (Sandbox Code Playgroud)
最后,我尝试查看数据框是否可以使用
EmpDF.show
Run Code Online (Sandbox Code Playgroud)
我收到如下错误
ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException:
java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class …Run Code Online (Sandbox Code Playgroud) 是str以某种方式隐式转换为str.charAt,所以map函数有效吗?
val str = "abcdefghij"
println(List.range(0, 10).map(str));
Run Code Online (Sandbox Code Playgroud) apache-spark ×6
scala ×5
azure ×2
java ×2
python ×2
collections ×1
databricks ×1
dataframe ×1
flask ×1
inner-join ×1
join ×1
json ×1
oozie ×1
pandas ×1
rabbitmq ×1
rabbitmqctl ×1