这是我第一次来这里,很抱歉,如果我不发布罚款,抱歉我的英语不好.
我正在尝试配置Apache Flume和Elasticsearch接收器.一切都很好,似乎它运行正常,但是当我启动代理时有2个警告; 以下是:
2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies …Run Code Online (Sandbox Code Playgroud) 当火花通过驱动程序提取数据时,我试图用简单的术语,然后当spark不需要通过驱动程序提取数据时.
我有3个问题 -
sc.textfile(path) 或sc.textfile(path).toDF等).如果驱动程序只运行32 GB内存,它会导致驱动程序有OOM吗?或者至少对司机吉姆进行掉期交易?或者spark和hadoop是否足够聪明,可以将数据从HDFS分发到一个Spark执行器,以便在不通过驱动程序的情况下生成数据帧/ RDD?hadoop apache-spark apache-spark-sql spark-dataframe data-ingestion
因为在 Debian stable 上安装非常容易,所以我决定使用 PostgreSQL 9.6 为我需要处理的一些数据构建一个数据仓库。第一步是以最少的转换将数据加载到数据库中,主要是纠正一些已知的格式错误以及布尔值的表示方式。我已经检查过这些更正是否有效:将n行写入磁盘所需的时间与n.
但是,使用 PostgreSQL 批量加载这些数据COPY FROM(无论如何;\copy或 psycopg2copy_expert或COPY FROM '/path/to/data.csv')都需要超线性的时间。渐近时间复杂度似乎比 好一些O(exp(sqrt(n)))。这就是我已经的复杂性:
READ UNCOMMITTED, 和DEFERRED.这是我看到的最严重的违规者之一,一个 17M 的行表:
禁用fsync可将进程加速 10 倍,因此 I/O 显然是一个巨大的瓶颈。然而,除此之外,时间行为并没有太大变化:
当我使用代理键而不是业务键时,这个问题就完全消失了:当我使用自动递增的整数列作为主键时,摄取又需要时间,这正是我想要的。因此,我不仅对我的问题有一个完全有效的解决方法,而且我还知道复杂的主键是罪魁祸首(业务键通常是短 VARCHAR 列的元组)。?(n)
但是,我想了解为什么 PostgreSQL 在以业务密钥为键的情况下要花这么长时间来摄取数据,以便我更好地了解我的工具。特别是,我不知道如何调试这个摄取过程,因为EXPLAIN不适用于COPY. 对于复合主键,可能将数据排序到存储中需要更长的时间,或者这是由于索引,或者主键约束实际上仍然是NOT DEFERRED; 如果解决方法不会那么有效,或者由于其他原因不受欢迎,我如何才能发现这里实际发生了什么?
我正在使用 Apache NiFi 来摄取和预处理一些 CSV 文件,但是在长时间运行时,它总是失败。错误总是一样的:
FlowFile Repository failed to update
Run Code Online (Sandbox Code Playgroud)
在日志中搜索,我总是看到这个错误:
2018-07-11 22:42:49,913 ERROR [Timer-Driven Process Thread-10] o.a.n.p.attributes.UpdateAttribute UpdateAttribute[id=c7f45dc9-ee12-31b0-8dee-6f1746b3c544] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update: org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
org.apache.nifi.processor.exception.ProcessException: FlowFile Repository failed to update
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:405)
at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:336)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: **Cannot update journal file ./flowfile_repository/journals/8772495.journal because this …Run Code Online (Sandbox Code Playgroud) 我正在使用 Cloud Run 创建数据提取管道。每次通过 Pub Sub 将文件放入 GCS 存储桶时,我的 Cloud Run api 都会被调用。我需要加载一些元数据,其中包含我正在摄取的数据的文本。此元数据很少更改。我显然不想在每次执行时将其重新加载到内存中。我最好的选择是什么?到目前为止我能够研究的是:
选项1
如果在每个服务请求上重新创建对象的成本很高,您还可以将对象缓存在内存中。将其从请求逻辑转移到全局范围会带来更好的性能。 https://cloud.google.com/run/docs/tips#run_tips_global_scope-java
在此链接给出的示例中,heavyComputation 函数仅在冷启动时调用一次吗?如果我需要在元数据更新时偶尔重新触发此功能怎么办?我还发现以下信息令人不安,因为它似乎说不能保证其他实例会重用该对象。
在 Cloud Run 中,您不能假设服务状态在请求之间保留。但是,Cloud Run 确实会重用各个容器实例来服务持续的流量,因此您可以在全局范围内声明一个变量,以允许在后续调用中重用其值。无法提前知道是否有任何单独的请求能够从这种重用中获益。
选项2
使用 Redis 或云内存存储之类的东西,只要有变化,云功能就会更新。所有云运行 api 的实例都从 Redis 中提取元数据信息。这会比选项 1 的性能更低还是更高吗?这还有其他缺点吗?
如果还有其他更好的方法可以做到这一点,我会很感兴趣。
更新 1:我更多地考虑了这一点,由于每个租户的元数据都会有所不同,并且每次调用云运行代码都会为一个租户摄取一个文件,因此加载所有租户是一个坏主意每次执行时的元数据,即使已缓存。不过,我可能会在每个租户的项目中运行单独的云。
shared-memory data-ingestion google-cloud-memorystore google-cloud-run
问题
使用 Snowpipe 加载小文件(例如 4K)比 16K、500K 或 1-10Mb(推荐的文件大小)贵多少。注意:这个问题意味着加载小文件比推荐的 1-10Mb 更昂贵。
了解最佳做法是加载大小为 1-10Mb 的文件,但我需要近乎实时的交付(几分钟)。我可以连接文件使它们更大,但不能等待超过 60 秒才能将微批处理发送到 S3,因此也发送到 Snowpipe。我目前每 30 秒写一次我有的东西,但我每 60 秒看到一次 Snowpipe 报告。这是否意味着将文件写入 S3 的频率超过 60 秒没有意义?IE。如果我每 30 秒发送一次文件,它实际上会减少平均延迟还是 60 秒是最小雪管周期。
加载 4K 文件(每天大约 200Mb,每个文件 4K),每 GB 花费大约 20 个积分,这是非常昂贵的。如果我加载(例如)1-10Mb 范围内的 CSV 文件,使用 Snowpipe 的每 GB 成本应该是多少?如果我保持在 1-10Mb 范围内,我的每千兆字节成本会下降吗?
是否有任何更快/更便宜的替代方法可以将数据导入 Snowflake?注意:目前使用 Parquet 格式的 Snowpipe 到 VARIANT,然后使用 STREAMS 和 TASKS 重构数据以进行近实时分析。了解使用 Snowpipe 比使用虚拟仓库更便宜。这是真的?我怀疑真正的答案是“视情况而定”。但是“取决于什么”。
除了我的近实时要求之外,我还有许多系统提供批处理提要(CSV 格式,大约每 4 小时一次,预计在 30 分钟内处理和呈现以供分析)。文件大小在这里各不相同,但大多数是 1Mb到 1Gb 范围。我应该使用相同的 Snowpipe 解决方案,还是我最好从 Airflow 编排工作并在专用虚拟仓库上使用 COPY 命令和 SQL 语句?或者实际上,您会推荐什么替代方案?
我可以看到 …
众所周知,AWS Timestream 已于上周正式发布。
从那时起,我一直在尝试对它进行试验并了解它如何建模和存储数据。
我在将记录摄取到 Timestream 时遇到问题。我有一些日期为 2020 年 4 月 23 日的记录。在尝试将这些记录插入时间流表时,我收到 RecordRejected 错误。根据此链接,如果记录具有相同的维度、时间戳或时间戳超出表的内存存储的保留期,则会拒绝记录。
我已将表的内存存储的保留期设置为 12 个月。根据文档:任何时间戳超过 12 个月的记录都将被拒绝。然而,尽管上面提到的记录在 12 个月内有时间戳,但还是被拒绝了。
在进一步调查中,我注意到,今天日期(2020 年 10 月 5 日)的记录被成功摄取,但是,日期为 30 天前的记录(即 2020 年 9 月 5 日)不会被摄取。为确保这一点,我还尝试插入日期为 9 月 6 日和今天日期和 9 月 5 日之间还有几天的记录。所有这些都已成功插入。
有人可以解释为什么我无法在内存存储的保留期内插入具有时间戳的记录吗?它只允许我插入最多 30 天的记录。我也想知道是否有一种方法可以将历史数据直接插入到磁存储中。内存存储保留期可能不足以满足我的用例,我可能需要插入 2 年或更长时间的数据。我知道这不是时间流的经典用例,但我仍然很想知道。
我被困在这个问题上,非常感谢一些帮助。
先感谢您。
我正在编写一个应用程序,用于绘制财务数据并与此类数据的实时反馈进行交互。由于任务的性质,可能会以一次一笔交易的方式非常频繁地接收实时市场数据。我在本地使用数据库,并且我是唯一的用户。只有一个程序(我的中间件)会将数据插入数据库。我最关心的是延迟——我想尽可能地减少它。出于这个原因,我想避免使用队列(从某种意义上说,我希望缓冲区表来履行该角色)。Clickhouse 为我计算的许多分析预计也是实时的(尽可能)。我有三个问题:
\n问题 1)澄清缓冲区表文档中的一些限制/警告
\n根据 Clickhouse 文档,我了解到许多小型插入至少可以说是次优的。在研究该主题时,我发现缓冲区引擎 [1] 可以用作解决方案。这对我来说很有意义,但是当我阅读 Buffer 的文档时,我发现了一些警告:
\n\n\n请注意,一次一行插入数据是没有意义的,即使对于缓冲区表也是如此。这只会产生每秒几千行的速度,而插入更大的数据块可以每秒产生超过一百万行(请参阅 \xe2\x80\x9cPerformance\xe2\x80\x9d 部分)。
\n
每秒几千行对我来说绝对没问题,但是我担心其他性能考虑因素 - 如果我一次将一行数据提交到缓冲表,我是否应该预期 CPU/内存会出现峰值?如果我理解正确的话,一次向 MergeTree 表提交一行会给合并作业带来大量额外的工作,但如果使用 Buffer Table,这应该不是问题,对吗?
\n\n\n如果服务器异常重启,缓冲区中的数据就会丢失。
\n
据我所知,这是指停电或计算机崩溃之类的事情。如果我正常关闭计算机或正常停止clickhouse服务器,我可以期望缓冲区将数据刷新到目标表吗?
\n问题2)阐明查询的工作原理(常规查询+物化视图)
\n\n\n从缓冲区表读取数据时,将从缓冲区和目标表(如果有)中处理数据。\n请注意,缓冲区表不支持索引。换句话说,缓冲区中的数据被完全扫描,这对于大缓冲区来说可能会很慢。(对于下级表中的数据,将使用其支持的索引。)
\n
这是否意味着我可以对目标表使用查询并期望自动包含缓冲区表数据?或者是相反 - 我查询缓冲表并且目标表包含在后台?如果任一为真(并且我不需要手动聚合两个表),这是否也意味着将填充物化视图?哪个表应该触发物化视图 - 磁盘表还是缓冲表?或者以某种方式两者兼而有之?
\n我非常依赖物化视图,并且需要它们实时更新(或尽可能接近)。实现这一目标的最佳策略是什么?
\n问题3)当我在刷新数据时查询数据库时会发生什么?
\n我在这里主要关心的两个问题是:
\n感谢您的时间。
\n[1] https://clickhouse.tech/docs/en/engines/table-engines/special/buffer/ …
欲望:
我想要一种方法来合并两个数据帧并保留指定数据帧中的非相交数据。
问题:
我有重复的数据,我希望这一行能够删除重复的数据:
final_df = new_df[~new_df.isin(previous_df)].dropna()
Run Code Online (Sandbox Code Playgroud)
示例数据及数据测试:
record = Record(1000, 9300815, '<redacted type>', '<redacted id>')
test_df = pd.DataFrame([record])
if not final_df.empty:
# this produces an empty data frame
empty_df = test_df[test_df.isin(final_df)].dropna()
# this produces the record
record_df = final_pdf[final_pdf.col01 == record.col01]
Run Code Online (Sandbox Code Playgroud)
背景:
我正在加载 xml 数据并将 xml 文件转换为几种不同的记录类型作为命名元组。我将每个记录类型拆分为自己的数据帧。previous_df然后,我通过如下构造将 xml 文件中的当前数据集与已加载到数据库中的数据进行比较:
previous_df = pd.read_sql_table(table_name, con=conn, schema=schema, columns=columns)
Run Code Online (Sandbox Code Playgroud)
列是根据命名元组中的字段动态创建的。数据库模式是使用 sqlalchemy 生成的,UniqueConstraint当我认为数据库中存在重复项时,我添加了管理。
预先感谢您提供的任何帮助。
我正在尝试使用 Google Data Fusion 免费版本将简单的 CSV 文件从 GCS 加载到 BQ。管道因错误而失败。它读着
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Insufficient 'DISKS_TOTAL_GB' quota. Requested 3000.0, available 2048.0.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[na:na]
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[na:na]
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) ~[na:na]
Run Code Online (Sandbox Code Playgroud)
Mapreduce 和 Spark 执行管道都会重复相同的错误。感谢您为解决此问题提供的任何帮助。谢谢
问候卡
data-processing google-cloud-platform data-ingestion data-pipeline google-cloud-data-fusion
data-ingestion ×10
postgresql ×2
apache-nifi ×1
apache-spark ×1
clickhouse ×1
constraints ×1
flume ×1
hadoop ×1
journal ×1
pandas ×1
python ×1
snowflake-cloud-data-platform ×1
sqlalchemy ×1