TL; DR:我应该如何编写 aDockerfile或 docker 命令来运行 docker 容器,以便在点击 时可以停止并退出正在运行的 docker 容器ctrl+c?
我需要在 shell 脚本中运行无限 while 循环。当我在本地运行此脚本时,该ctrl+c命令将退出无限循环。
# content of sync.sh
while true; do
echo "Do something!"
some_syncing_command || {
rm -rf /tmp/healthy && break
}
echo "Finish doing something!"
touch /tmp/healthy
sleep ${waitingSeconds}
done
Run Code Online (Sandbox Code Playgroud)
因此,基于 shell 脚本,我创建了一个包含以下Dockerfile内容的 Docker 镜像:
FROM debian:stretch
COPY sync.sh .
ENTRYPOINT ["/sync.sh"]
Run Code Online (Sandbox Code Playgroud)
并通过运行构建图像docker build -t infinite-loop .
但是,在尝试不同的尝试运行infinite-loop图像后,我无法在点击后停止并退出正在运行的 docker 容器ctrl + c。以下是我用来运行 …
上下文
我正在使用一个流管道,它在 pubsub 中有一个 protobuf 数据源。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。通过在processDoFn 函数中初始化 protobuf 消息,我成功地开发了 Protobuf Parser 。
但是,我想知道,是否可以在 Beam 上制作一个通用的 ProtobufParser DoFn?从工程角度来看,通用 DoFn 非常有用,可以避免重新实现现有功能并实现代码重用。在 Java 中,我知道我们能够使用泛型,因此在 Java 中实现这个泛型 ProtobufParser 相对容易。由于 Python 函数是一流的对象,我在想是否可以将 Protobuf 模式类(而不是消息实例对象)传递到 DoFn 中。我试图这样做,但我一直失败。
下面是我目前成功的 protobuf 解析器。protobuf 消息在函数内部初始化process。
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
Run Code Online (Sandbox Code Playgroud)
虽然上面的 Protobuf DoFn 解析器工作很好,但它并没有推广到所有的 protobuf 模式,因此这将导致需要为不同的 protobuf 模式重新实现一个新的 …
python protocol-buffers google-cloud-platform google-cloud-dataflow apache-beam
我目前是在 Python 中使用 Apache Beam 和 Dataflow runner 的新手。我有兴趣创建一个发布到 Google Cloud PubSub 的批处理管道,我对 Beam Python API 进行了修改并找到了一个解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,让我很好奇。
目前,我成功地从 GCS 批量发布数据的光束管道如下所示:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> …Run Code Online (Sandbox Code Playgroud)