小编Sky*_*Sky的帖子

在Akka actor的receive方法中处理异步调用的最佳方法

我试图将数据持久化到数据库中.我的持久化方法是异步的.

class MyActor(persistenceFactory:PersistenceFactory) extends Actor {
  def receive: Receive = {
    case record: Record =>
      // this method is asynchronous, immediate return Future[Int]
      persistenceFactory.persist(record) 
  }
}
Run Code Online (Sandbox Code Playgroud)

当应用程序在增加的负载下运行时,瓶颈就是我们内存不足或没有线程可用.

那么在Akka actor的receive方法中处理异步调用的最佳方法是什么?

parallel-processing multithreading asynchronous scala akka

9
推荐指数
1
解决办法
555
查看次数

使用 Java 客户端从 Elasticsearch 导入/导出数据的最佳方式

我正在将数百万条记录摄取到 Elasticsearch 中,并从 Elasticsearch 中提取记录。我实际上使用的是 Elasticsearch Java 客户端。我在每个 JVM 上只创建一个客户端。使用这个客户端,将数据摄取到 Elasticsearch 中,并使用它从 Elasticsearch 中提取数据。提取数据写入文件并进行一些分析,再次写入文件并将数据摄取回弹性搜索。

  1. 这是在一个 JVM 上仅创建一个 Java 客户端并使其保持活动状态的最佳方式吗?

  2. 或在需要时创建客户端并摄取/提取数据,关闭它。

  3. 或者创建客户端池并重用它。(比如连接池)

做这个的最好方式是什么 ?

java search elasticsearch

8
推荐指数
1
解决办法
339
查看次数

如何使用 aws-java-sdk 从 S3 逐块读取文件

我正在尝试将大文件从 S3 读取到块中,而无需为并行处理切割任何行。

举例说明:S3上有1G大小的文件。我想将此文件分成 64 MB 的夹头。我可以很容易地做到:

S3Object s3object = s3.getObject(new GetObjectRequest(bucketName, key));

InputStream stream = s3object.getObjectContent();

byte[] content = new byte[64*1024*1024];

while (stream.read(content)  != -1) {

//process content here 

}
Run Code Online (Sandbox Code Playgroud)

但是块的问题是它可能有 100 行完整的行和一个不完整的行。但我无法处理不完整的行,也不想丢弃它。

有什么办法可以处理这种情况吗?表示所有夹头都没有局部线。

java io amazon-s3 amazon-web-services aws-java-sdk

7
推荐指数
2
解决办法
1万
查看次数

ML 函数作为 pyspark UDF

我对 pyspark 和 python 有点陌生。我正在尝试将 ML 函数作为 pyspark UDF 运行。

这是一个例子:

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

df = spark.createDataFrame(['Bob has a dog. He loves him'], StringType())

def parse(text):
    import spacy
    import neuralcoref
    nlp = spacy.load('en_core_web_sm')
    # Let's try before using the conversion dictionary:
    neuralcoref.add_to_pipe(nlp)
    doc = nlp(text)
    return doc._.coref_resolved

 pd_udf = pandas_udf(parse, returnType=StringType())

 df.select(pd_udf(col("value"))).show()
Run Code Online (Sandbox Code Playgroud)

出现此错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), …
Run Code Online (Sandbox Code Playgroud)

python pandas apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
809
查看次数