我正在尝试在图表中可视化数据流,其中数据可以在一个或多个方向上流动.一些演员将数据推送到图表中的另一个演员,其他演员拉动数据.
从我所能找到的,在uml中没有它的符号,但我可能是错的.在这样的图表中传达谁的行为部分以及数据流的方向的好方法是什么?
我有写在目录结构中的数据文件(在此示例中为json,但也可能是avro):
dataroot
+-- year=2015
+-- month=06
+-- day=01
+-- data1.json
+-- data2.json
+-- data3.json
+-- day=02
+-- data1.json
+-- data2.json
+-- data3.json
+-- month=07
+-- day=20
+-- data1.json
+-- data2.json
+-- data3.json
+-- day=21
+-- data1.json
+-- data2.json
+-- data3.json
+-- day=22
+-- data1.json
+-- data2.json
Run Code Online (Sandbox Code Playgroud)
使用spark-sql创建一个临时表:
CREATE TEMPORARY TABLE dataTable
USING org.apache.spark.sql.json
OPTIONS (
path "dataroot/*"
)
Run Code Online (Sandbox Code Playgroud)
查询表效果很好,但到目前为止,我无法使用目录进行修剪。
有没有一种方法可以将目录结构注册为分区(不使用Hive)以避免在查询时扫描整个树?假设我想比较每个月第一天的数据,并且只读取这几天的目录。
使用Apache Drill,我可以在查询期间使用目录作为谓词,dir0
等等。是否可以使用Spark SQL做类似的事情?
我阅读了 Kafka wiki,对这张图片有一些问题。
对于Consumer Group A,C1,C2,只能接收两个partition消息,像C1只接收P0,C2只接收P1?
据我所知,一个Consumer Group映射一个Topic,所以C1、C2的Topic一定是一样的,所以PO、P1、P2、P3的Topic也是一样的,对吗?
所以有一个矛盾,如果问题2是对的,那么消费者组A和消费者组B有相同的主题,所以一个消费者组映射一个主题的矛盾。
C1如何控制P0,P1消息,如果P0,P1有相同的topic,就意味着C1会收到重复的消息,如果不是,C1如何控制只有一个偏移量的不同消息?
关于“所有分区都包含相同的主题,至少我是这样解释这张图片的”的问题。所以我假设,同一个主题名为“test”,然后一个生产者为这个主题生成消息“Hello test”,这意味着 C1 、C2、C3、C4都会收到同样的消息吗?对于第四个答案,C1 仍然收到两次“Hello test”?
CG-A 或 CG-B 能收到不同主题的消息吗?
我没有看到 Consumer Group 的任何优势,“有时从 Kafka 读取消息的逻辑并不关心处理消息偏移量,它只需要数据。因此提供高级消费者来抽象大部分细节消费来自 Kafka 的事件。” 来自 Kafka 的 wiki,你能给我一个关于这张图片的消费者组的例子,比如你种子 CG-A 正在报告任务,而 CG-B 正在监控?
这是否意味着来自一个名为“test”的主题的 P0、P1、P2、P3 会发送不同的消息?但我关注了 Kafka 的 wiki,比如:
一种。bin/kafka-server-start.sh 配置/server.properties
湾 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test //分区为3
C。bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 测试
d. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
然后我在生产者中输入一些东西,然后消费者会显示这些信息?
那么这三个分区怎么会有不同的信息呢?
非常感谢
我使用的dateutil.relativedelta()
是time_unit
在我的age
-tuple和代码中对应的命名参数,以获得相对时间的样子:
def time_delta(age):
now = datetime.fromtimestamp(int(time.time()))
if age.time_unit == "seconds":
relative_time = now - relativedelta(seconds=int(age.value))
elif age.time_unit == "minutes":
relative_time = now - relativedelta(minutes=int(age.value))
elif age.time_unit == "hours":
relative_time = now - relativedelta(hours=int(age.value))
elif age.time_unit == "days":
relative_time = now - relativedelta(days=int(age.value))
elif age.time_unit == "weeks":
relative_time = now - relativedelta(weeks=int(age.value))
elif age.time_unit == "months":
relative_time = now - relativedelta(months=int(age.value))
elif age.time_unit == "years":
relative_time = now - relativedelta(years=int(age.value))
Run Code Online (Sandbox Code Playgroud)
在Python 2.7中是否有任何方法可以使用以下内容:
relative_time = now …
Run Code Online (Sandbox Code Playgroud)