Mar*_*łek 5 networking dart grpc flutter
我正在使用 gRPC 开发 Flutter 应用程序并且一切正常,直到我决定看看如果没有互联网连接会发生什么。
这样做并提出请求后,我收到以下错误:
E/flutter (26480):gRPC 错误(14,调用错误:错误状态:http/2 连接不再处于活动状态,因此不能用于创建新流。)
问题是即使重新启用连接后,错误仍然发生。
我必须重新创建 clientChannel 吗?
const String serverUrl = 'theaddress.com';
const int serverPort = 50051;
final ClientChannel defaultClientChannel = ClientChannel(
serverUrl,
port: serverPort,
options: const ChannelOptions(
credentials: const ChannelCredentials.insecure(),
),
);
Run Code Online (Sandbox Code Playgroud)
我只想抛出一些错误,但一旦互联网连接恢复正常工作。
根据 @Ishaan 的建议,我使用 Connectivity 包创建了一个客户端,该客户端在互联网恢复时重新连接。到目前为止,它似乎正在发挥作用。
import 'dart:async';
import 'package:connectivity/connectivity.dart';
import 'package:flutter_worker_app/generated/api.pbgrpc.dart';
import 'package:grpc/grpc.dart';
import 'package:rxdart/rxdart.dart';
class ConnectiveClient extends ApiClient {
final CallOptions _options;
final Connectivity _connectivity;
ClientChannel _channel;
bool hasRecentlyFailed = false;
ConnectiveClient(this._connectivity, this._channel, {CallOptions options})
: _options = options ?? CallOptions(),
super(_channel) {
//TODO: Cancel connectivity subscription
_connectivity.onConnectivityChanged.listen((result) {
if (hasRecentlyFailed && result != ConnectivityResult.none) {
_restoreChannel();
}
});
}
///Create new channel from original channel
_restoreChannel() {
_channel = ClientChannel(_channel.host,
port: _channel.port, options: _channel.options);
hasRecentlyFailed = false;
}
@override
ClientCall<Q, R> $createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests,
{CallOptions options}) {
//create call
BroadcastCall<Q, R> call = createChannelCall(
method,
requests,
_options.mergedWith(options),
);
//listen if there was an error
call.response.listen((_) {}, onError: (Object error) async {
//Cannot connect - we assume it's internet problem
if (error is GrpcError && error.code == StatusCode.unavailable) {
//check connection
_connectivity.checkConnectivity().then((result) {
if (result != ConnectivityResult.none) {
_restoreChannel();
}
});
hasRecentlyFailed = true;
}
});
//return original call
return call;
}
/// Initiates a new RPC on this connection.
/// This is copy of [ClientChannel.createCall]
/// The only difference is that it creates [BroadcastCall] instead of [ClientCall]
ClientCall<Q, R> createChannelCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) {
final call = new BroadcastCall(method, requests, options);
_channel.getConnection().then((connection) {
if (call.isCancelled) return;
connection.dispatchCall(call);
}, onError: call.onConnectionError);
return call;
}
}
///A ClientCall that can be listened multiple times
class BroadcastCall<Q, R> extends ClientCall<Q, R> {
///I wanted to use super.response.asBroadcastStream(), but it didn't work.
///I don't know why...
BehaviorSubject<R> subject = BehaviorSubject<R>();
BroadcastCall(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options)
: super(method, requests, options) {
super.response.listen(
(data) => subject.add(data),
onError: (error) => subject.addError(error),
onDone: () => subject.close(),
);
}
@override
Stream<R> get response => subject.stream;
}
Run Code Online (Sandbox Code Playgroud)