我试图将数据持久化到数据库中.我的持久化方法是异步的.
class MyActor(persistenceFactory:PersistenceFactory) extends Actor {
def receive: Receive = {
case record: Record =>
// this method is asynchronous, immediate return Future[Int]
persistenceFactory.persist(record)
}
}
Run Code Online (Sandbox Code Playgroud)
当应用程序在增加的负载下运行时,瓶颈就是我们内存不足或没有线程可用.
那么在Akka actor的receive方法中处理异步调用的最佳方法是什么?
我正在将数百万条记录摄取到 Elasticsearch 中,并从 Elasticsearch 中提取记录。我实际上使用的是 Elasticsearch Java 客户端。我在每个 JVM 上只创建一个客户端。使用这个客户端,将数据摄取到 Elasticsearch 中,并使用它从 Elasticsearch 中提取数据。提取数据写入文件并进行一些分析,再次写入文件并将数据摄取回弹性搜索。
这是在一个 JVM 上仅创建一个 Java 客户端并使其保持活动状态的最佳方式吗?
或在需要时创建客户端并摄取/提取数据,关闭它。
或者创建客户端池并重用它。(比如连接池)
做这个的最好方式是什么 ?
我正在尝试将大文件从 S3 读取到块中,而无需为并行处理切割任何行。
举例说明:S3上有1G大小的文件。我想将此文件分成 64 MB 的夹头。我可以很容易地做到:
S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));
InputStream stream = s3object.getObjectContent();
byte[] content = new byte[64*1024*1024];
while (stream.read(content) != -1) {
//process content here
}
Run Code Online (Sandbox Code Playgroud)
但是块的问题是它可能有 100 行完整的行和一个不完整的行。但我无法处理不完整的行,也不想丢弃它。
有什么办法可以处理这种情况吗?表示所有夹头都没有局部线。
我对 pyspark 和 python 有点陌生。我正在尝试将 ML 函数作为 pyspark UDF 运行。
这是一个例子:
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType
df = spark.createDataFrame(['Bob has a dog. He loves him'], StringType())
def parse(text):
import spacy
import neuralcoref
nlp = spacy.load('en_core_web_sm')
# Let's try before using the conversion dictionary:
neuralcoref.add_to_pipe(nlp)
doc = nlp(text)
return doc._.coref_resolved
pd_udf = pandas_udf(parse, returnType=StringType())
df.select(pd_udf(col("value"))).show()
Run Code Online (Sandbox Code Playgroud)
出现此错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), …Run Code Online (Sandbox Code Playgroud) java ×2
akka ×1
amazon-s3 ×1
apache-spark ×1
asynchronous ×1
aws-java-sdk ×1
io ×1
pandas ×1
pyspark ×1
python ×1
scala ×1
search ×1