import tensorflow as tf
B = 3
D = 4
T = 5
tf.reset_default_graph()
xs = tf.placeholder(shape=[T, B, D], dtype=tf.float32)
with tf.variable_scope("RNN"):
GRUcell = tf.contrib.rnn.GRUCell(num_units = D)
cell = tf.contrib.rnn.MultiRNNCell([GRUcell])
output_ta = tf.TensorArray(size=T, dtype=tf.float32)
input_ta = tf.TensorArray(size=T, dtype=tf.float32)
input_ta.unstack(xs)
def body(time, output_ta_t, state):
xt = input_ta.read(time)
new_output, new_state = cell(xt, state)
output_ta_t.write(time, new_output)
return (time+1, output_ta_t, new_state)
def condition(time, output, state):
return time < T
time = 0
state = cell.zero_state(B, tf.float32)
time_final, output_ta_final, state_final = tf.while_loop(
cond=condition,
body=body,
loop_vars=(time, …Run Code Online (Sandbox Code Playgroud) 以编程方式生成的动态链接未被正确捕获
FirebaseDynamicLinks.instance.getInitialLink().
如果应用程序已关闭。但是,如果应用程序处于打开状态,侦听器会正确检测到新传入的动态链接。我不清楚这是否是设置问题,我是如何生成动态链接的。
再现
首先按照文档为 Flutter 项目设置 Firebase。然后设置动态链接:
/// See also
/// https://firebase.google.com/docs/dynamic-links/use-cases/rewarded-referral
/// how to implement referral schemes using Firebase.
Future<ShortDynamicLink> buildDynamicLink(String userId) async {
final PackageInfo packageInfo = await PackageInfo.fromPlatform();
final String packageName = packageInfo.packageName;
var androidParams = AndroidParameters(
packageName: packageInfo.packageName,
minimumVersion: Constants.androidVersion, // app version and not the Android OS version
);
var iosParams = IosParameters(
bundleId: packageInfo.packageName,
minimumVersion: Constants.iosVersion, // app version and not the iOS version
appStoreId: Constants.iosAppStoreId,
);
var socialMetaTagParams = SocialMetaTagParameters(
title: 'Referral …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Confluent S3接收器的TimeBasedPartitioner.这是我的配置:
{
"name":"s3-sink",
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"file":"test.sink.txt",
"topics":"xxxxx",
"s3.region":"yyyyyy",
"s3.bucket.name":"zzzzzzz",
"s3.part.size":"5242880",
"flush.size":"1000",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"timestamp.field":"local_timestamp",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"3600000",
"schema.compatibility":"NONE"
}
Run Code Online (Sandbox Code Playgroud)
}
数据是二进制的,我使用avro方案.我想使用实际记录字段"local_timestamp",它是一个UNIX时间戳来对数据进行分区,比如分成小时文件.
我使用通常的REST API调用启动连接器
curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors
Run Code Online (Sandbox Code Playgroud)
不幸的是,数据没有按照我的意愿进行分区.我也试图删除刷新大小,因为这可能会干扰.但后来我得到了错误
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%
Run Code Online (Sandbox Code Playgroud)
知道如何正确设置TimeBasedPartioner吗?我找不到一个有效的例子.
另外,如何调试这样的问题或进一步了解连接器实际上在做什么?
非常感谢任何帮助或进一步的建议.
我必须处理时区和纳秒时间分辨率。因此我使用 ZonedDateTime。显然 Apache Flink 没有正确序列化 ZonedDateTime。它确实按预期序列化了 LocalDateTime 部分,但是,它忘记了处理时区。
例如,当我在 Flink 流映射函数中记录分区日期时,我总是得到类似的信息
2018-03-01T04:10:30.773471918null
Run Code Online (Sandbox Code Playgroud)
而在数据开始时我得到了正确的区域
2018-03-01T04:10:30.773471918-05:00
Run Code Online (Sandbox Code Playgroud)
null 指的是区域。当然,后来我得到了一个空指针异常,因为我必须使用适当的时间比较,这需要区域。
我怎样才能最简单地解决这个问题?感谢您的回复。
我尝试了解 ProcessWindowFunction 中可以使用的各种状态的差异。
首先,ProcessWindowFunction是一个AbstractRichFunction
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
extends AbstractRichFunction {...}
Run Code Online (Sandbox Code Playgroud)
因此它可以使用该方法
public RuntimeContext getRuntimeContext()
Run Code Online (Sandbox Code Playgroud)
获得一个状态
getRuntimeContext().getState
Run Code Online (Sandbox Code Playgroud)
更多,WindowProcessFunction的处理函数
def process(key: KEY, context: Context, elements: Iterable[IN], out:
Collector[OUT]) {}
Run Code Online (Sandbox Code Playgroud)
有一个上下文,其中又有两种方法允许我获取状态:
/**
* State accessor for per-key and per-window state.
*/
def windowState: KeyedStateStore
/**
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
Run Code Online (Sandbox Code Playgroud)
这是我的问题:
1)这些与 getRuntimeContext().getState 有什么关系?
2)我经常使用自定义触发器实现和全局窗口。在这种情况下,使用 getPartitionedState 检索状态。我可以在触发函数中访问 WindowProcessFunction 中定义的窗口状态吗?如果是这样怎么办?
3)Trigger类中没有可以重写的open方法,状态创建是如何处理的?只调用 getPartitionedState 是否安全,它还管理状态创建?
我想在F#对象表达式中有一个可变状态.第一种方法是使用ref单元格如下:
type PP =
abstract member A : int
let foo =
let a = ref 0
{ new PP with
member x.A =
let ret = !a
a := !a + 1
ret
}
printfn "%A" foo.A
printfn "%A" foo.A
printfn "%A" foo.A
printfn "%A" foo.A
Run Code Online (Sandbox Code Playgroud)
一种不同的方法如下:
type State(s : int) =
let mutable intState = s
member x.state
with get () = intState
and set v = intState <- v
[<AbstractClass>]
type PPP(state : State) =
abstract member …Run Code Online (Sandbox Code Playgroud) 我喜欢在Avro模式中多次使用相同的记录类型。考虑此架构定义
{
“ type”:“记录”,
“ name”:“ OrderBook”,
“ namespace”:“ my.types”,
“ doc”:“测试订单更新”,
“字段”:[
{
“ name”:“出价”,
“类型”:{
“ type”:“ array”,
“项目”:{
“ type”:“记录”,
“ name”:“ OrderBookVolume”,
“ namespace”:“ my.types”,
“字段”:[
{
“ name”:“ price”,
“ type”:“ double”
},
{
“ name”:“ volume”,
“ type”:“ double”
}
]
}
}
},
{
“ name”:“询问”,
“类型”:{
“ type”:“ array”,
“项目”:{
“ type”:“记录”,
“ name”:“ OrderBookVolume”,
“ namespace”:“ my.types”,
“字段”:[
{
“ name”:“ price”,
“ type”:“ double”
},
{
“ name”:“ volume”,
“ … apache-flink ×2
amazon-s3 ×1
apache-kafka ×1
avro ×1
confluent ×1
f# ×1
flutter ×1
mutable ×1
ref ×1
scala ×1
spark-avro ×1
tensorflow ×1
while-loop ×1