我正在尝试遵循此示例,但是当我尝试编译它时,出现此错误:
Error: Unable to initialize main class com.amazonaws.services.kinesisanalytics.aws
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
Run Code Online (Sandbox Code Playgroud)
错误是由于此代码:
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
Run Code Online (Sandbox Code Playgroud)
我想这是有问题的线路:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Run Code Online (Sandbox Code Playgroud)
这是我的 Maven 依赖项:
可能有什么问题?任何依赖项,版本?
注意:如果我评论有问题的行,程序运行没有问题。
POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>as</groupId>
<artifactId>a</artifactId>
<version>1</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<geotools.version>21-SNAPSHOT</geotools.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.6.2</flink.version>
<kda.version>1.0.1</kda.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target> …Run Code Online (Sandbox Code Playgroud) java intellij-idea noclassdeffounderror amazon-web-services apache-flink
我正在尝试使用 python 管道连接到 CloudSQL。
实际情况
连接功能
def cloudSQL(input):
import pymysql
connection = pymysql.connect(host='<server ip>',
user='...',
password='...',
db='...')
cursor = connection.cursor()
cursor.execute("select ...")
connection.close()
result = cursor.fetchone()
if not (result is None):
yield input
Run Code Online (Sandbox Code Playgroud)
错误
这是使用 DataflowRunner 的错误消息
OperationalError: (2003, "Can't connect to MySQL server on '<server ip>' (timed out)")
Run Code Online (Sandbox Code Playgroud)
云SQL
我有 publicIP(使用 directrunner 从本地进行测试),并且我还尝试激活私有 IP 以查看这是否是与 DataflowRunner 连接的问题
选项2
我也尝试过
connection = pymysql.connect((unix_socket='/cloudsql/' + <INSTANCE_CONNECTION_NAME>,
user='...',
password='...',
db='...')
Run Code Online (Sandbox Code Playgroud)
出现错误:
OperationalError: (2003, "Can't connect to …Run Code Online (Sandbox Code Playgroud) python google-cloud-sql google-cloud-platform google-cloud-dataflow
例如,如果您有一个带有多边形的GeoJSON文件(用于测试的简单文件)
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-4.658203125,
41.343824581185686
],
[
-5.6689453125,
39.13006024213511
],
[
-1.9335937499999998,
39.16414104768742
],
[
-1.3623046875,
41.21172151054787
],
[
-4.658203125,
41.343824581185686
]
]
]
}
}
]
}
Run Code Online (Sandbox Code Playgroud)
要点:
Geometry point2 = new WKTReader().read("POINT (-3.2958984375 40.44694705960048)");
Run Code Online (Sandbox Code Playgroud)
并且您想在程序中加载geoJSON文件以测试该多边形是否包含该点,那么如何使用JTS在Java中进行处理呢?
其他选项可以使用带有GeoJson插件的GeoTools,但是我无法解析GeoJson文件
我尝试过的
像这样使用GEOTOOLS
String content = new String(Files.readAllBytes(Paths.get("file.geojson")), "UTF-8");
GeometryJSON gjson = new GeometryJSON();
Reader reader = new StringReader(content);
Polygon p = gjson.readPolygon(reader); …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用自包含的 sparks 应用程序在 python 中执行(使用 Pycharm)一些示例。
我使用以下方法安装了 pyspark:
pip install pyspark
Run Code Online (Sandbox Code Playgroud)
根据示例的网络,它应该足以执行它:
python nameofthefile.py
Run Code Online (Sandbox Code Playgroud)
但我有这个错误:
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2422)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2422)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2422)
at org.apache.spark.SecurityManager.<init>(SecurityManager.scala:79)
at org.apache.spark.deploy.SparkSubmit.secMgr$lzycompute$1(SparkSubmit.scala:359)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$secMgr$1(SparkSubmit.scala:359)
at org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$7.apply(SparkSubmit.scala:367)
at org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$7.apply(SparkSubmit.scala:367)
at scala.Option.map(Option.scala:146)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:366)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:143)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end 3, length 2
at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319)
at …Run Code Online (Sandbox Code Playgroud) 现在的情况
该管道的目的是从发布/订阅中读取带有地理数据的有效负载,然后对这些数据进行转换和分析,最后返回条件是否为真或假
with beam.Pipeline(options=pipeline_options) as p:
raw_data = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
subscription='projects/XXX/subscriptions/YYY'))
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
def GeoDataIngestion(string_input):
<...>
return True or False
Run Code Online (Sandbox Code Playgroud)
理想情况1
如果 GeoDataIngestion 结果为 true,则 raw_data 将存储在大查询中
geo_data = (raw_data
| 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
| 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
)
def Condition(condition):
if condition:
<...WriteToBigQuery...>
#The class I used before to store raw_data without depending on evaluate condition:
class WriteToBigQuery(beam.PTransform): …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 Dataflow 中进行单元测试。
对于那个测试,在乞求时,我将从一个简单的硬编码字符串开始。
问题是我需要将该字符串转换为 pubsub 消息。我得到了以下代码来做到这一点:
// Create a PCollection from string a transform to pubsub message format
PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" +
""))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}));
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
<...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) …Run Code Online (Sandbox Code Playgroud) 我想在带有 python 的数据流模板中使用 FireStore。
我做了这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)
这是使用它的适当方式吗?
额外的信息
def firestore_update_multiple(row):
from google.cloud import firestore
db = firestore.Client()
doc_ref = db.collection(u'data').document(u'one')
doc_ref.update({
u'arrayExample': u'DataflowRunner',
u'booleanExample': True
})
Run Code Online (Sandbox Code Playgroud) python google-cloud-platform google-cloud-dataflow google-cloud-firestore
这是一个非常简单的问题,但我搜索了一个解决方案,没有什么对我有用.我将以下内容添加到我的pom.xml中,但无法导入org.apache.commons.csv.CSVParser我的class.当我运行mvn install时,我得到以下输出.
<dependencies>
....
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.1.1-SNAPSHOT</version>
</dependency>
....
</dependencies>
Run Code Online (Sandbox Code Playgroud)
D:\java\my_project>mvn install
[INFO] Scanning for projects...
[INFO] Building my_project 1.0-SNAPSHOT
[WARNING] The POM for org.apache.commons:commons-csv:jar:1.1.1-SNAPSHOT is missing, no dependency information available
[INFO] BUILD FAILURE
[INFO] Total time: 0.644 s
[INFO] Finished at: 2015-07-21T08:46:47-04:00
[INFO] Final Memory: 8M/245M
[ERROR] Failed to execute goal on project my_project: Could not resolve depen dencies for project com.myproject:artifact:jar:1.0-SNAPSHOT: Could not find artifact org.apache.commons:commons-csv:jar:1.1.1-SNAPSHOT -> [Help 1]
[ERROR] To see the …Run Code Online (Sandbox Code Playgroud) java ×4
python ×4
apache-beam ×2
apache-flink ×1
geotools ×1
jts ×1
maven ×1
pycharm ×1
pyspark ×1