小编IoT*_*ser的帖子

Apache Flink:java.lang.NoClassDefFoundError

我正在尝试遵循此示例,但是当我尝试编译它时,出现此错误:

Error: Unable to initialize main class com.amazonaws.services.kinesisanalytics.aws
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
Run Code Online (Sandbox Code Playgroud)

错误是由于此代码:

    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
    }
Run Code Online (Sandbox Code Playgroud)

我想这是有问题的线路:

  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Run Code Online (Sandbox Code Playgroud)

这是我的 Maven 依赖项:

在此处输入图片说明

可能有什么问题?任何依赖项,版本?

注意:如果我评论有问题的行,程序运行没有问题。

POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>as</groupId>
    <artifactId>a</artifactId>
    <version>1</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <geotools.version>21-SNAPSHOT</geotools.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.6.2</flink.version>
        <kda.version>1.0.1</kda.version>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target> …
Run Code Online (Sandbox Code Playgroud)

java intellij-idea noclassdeffounderror amazon-web-services apache-flink

10
推荐指数
2
解决办法
9234
查看次数

从 Python 中的 Dataflow 连接到 CloudSQL

我正在尝试使用 python 管道连接到 CloudSQL。


实际情况

  • 我可以使用 DirectRunner 毫无问题地做到这一点
  • 我无法使用 DataflowRunner 连接

连接功能

def cloudSQL(input):
    import pymysql
    connection = pymysql.connect(host='<server ip>',
                                   user='...',
                                   password='...',
                                   db='...')
    cursor = connection.cursor()
    cursor.execute("select ...")
    connection.close()
    result = cursor.fetchone()
    if not (result is None):
        yield input
Run Code Online (Sandbox Code Playgroud)

错误

这是使用 DataflowRunner 的错误消息

OperationalError: (2003, "Can't connect to MySQL server on '<server ip>' (timed out)")
Run Code Online (Sandbox Code Playgroud)

云SQL

我有 publicIP(使用 directrunner 从本地进行测试),并且我还尝试激活私有 IP 以查看这是否是与 DataflowRunner 连接的问题


选项2

我也尝试过

connection = pymysql.connect((unix_socket='/cloudsql/' + <INSTANCE_CONNECTION_NAME>,
                               user='...',
                               password='...',
                               db='...')
Run Code Online (Sandbox Code Playgroud)

出现错误:

OperationalError: (2003, "Can't connect to …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-sql google-cloud-platform google-cloud-dataflow

5
推荐指数
1
解决办法
3933
查看次数

使用Java拓扑套件或GeoTools解析GeoJSON文件

例如,如果您有一个带有多边形的GeoJSON文件(用于测试的简单文件)

{
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "properties": {},
      "geometry": {
        "type": "Polygon",
        "coordinates": [
          [
            [
              -4.658203125,
              41.343824581185686
            ],
            [
              -5.6689453125,
              39.13006024213511
            ],
            [
              -1.9335937499999998,
              39.16414104768742
            ],
            [
              -1.3623046875,
              41.21172151054787
            ],
            [
              -4.658203125,
              41.343824581185686
            ]
          ]
        ]
      }
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

要点:

Geometry point2 = new WKTReader().read("POINT (-3.2958984375 40.44694705960048)");
Run Code Online (Sandbox Code Playgroud)

并且您想在程序中加载geoJSON文件以测试该多边形是否包含该点,那么如何使用JTS在Java中进行处理呢?


其他选项可以使用带有GeoJson插件的GeoTools,但是我无法解析GeoJson文件


我尝试过的

这样使用GEOTOOLS

String content = new String(Files.readAllBytes(Paths.get("file.geojson")), "UTF-8");
GeometryJSON gjson = new GeometryJSON();
Reader reader = new StringReader(content);
Polygon p = gjson.readPolygon(reader); …
Run Code Online (Sandbox Code Playgroud)

java jts geotools

5
推荐指数
1
解决办法
1331
查看次数

Pycharm:Java 网关进程在发送其端口号之前退出

我正在尝试使用自包含的 sparks 应用程序在 python 中执行(使用 Pycharm)一些示例

我使用以下方法安装了 pyspark:

pip install pyspark 
Run Code Online (Sandbox Code Playgroud)

根据示例的网络,它应该足以执行它:

python nameofthefile.py
Run Code Online (Sandbox Code Playgroud)

但我有这个错误:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
    at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2422)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2422)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2422)
    at org.apache.spark.SecurityManager.<init>(SecurityManager.scala:79)
    at org.apache.spark.deploy.SparkSubmit.secMgr$lzycompute$1(SparkSubmit.scala:359)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$secMgr$1(SparkSubmit.scala:359)
    at org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$7.apply(SparkSubmit.scala:367)
    at org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$7.apply(SparkSubmit.scala:367)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:366)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:143)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end 3, length 2
    at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319)
    at …
Run Code Online (Sandbox Code Playgroud)

python pycharm pyspark

5
推荐指数
1
解决办法
6306
查看次数

条件语句 Python Apache Beam 管道

现在的情况

该管道的目的是从发布/订阅中读取带有地理数据的有效负载,然后对这些数据进行转换和分析,最后返回条件是否为真或假

 with beam.Pipeline(options=pipeline_options) as p:
        raw_data = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(
                    subscription='projects/XXX/subscriptions/YYY'))

        geo_data = (raw_data
                    | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))
                    
                    

def GeoDataIngestion(string_input):
    <...>
    return True or False
Run Code Online (Sandbox Code Playgroud)

理想情况1

如果 GeoDataIngestion 结果为 true,则 raw_data 将存储在大查询中

geo_data = (raw_data
                | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
                | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
                )

def Condition(condition):
    if condition:
        <...WriteToBigQuery...>


#The class I used before to store raw_data without depending on evaluate condition:

class WriteToBigQuery(beam.PTransform): …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

3
推荐指数
1
解决办法
4380
查看次数

数据流:字符串到发布订阅消息

我正在尝试在 Dataflow 中进行单元测试。

对于那个测试,在乞求时,我将从一个简单的硬编码字符串开始。

问题是我需要将该字符串转换为 pubsub 消息。我得到了以下代码来做到这一点:

    // Create a PCollection from string a transform to pubsub message format
    PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" + 
            ""))
            .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(new PubsubMessage(c.element().getBytes(), null));
                }
            }));
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

 java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    <...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-dataflow apache-beam

2
推荐指数
1
解决办法
525
查看次数

在 Google Dataflow 中使用 FireStore

我想在带有 python 的数据流模板中使用 FireStore。

我做了这样的事情:

with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)

这是使用它的适当方式吗?


额外的信息

def firestore_update_multiple(row):
    from google.cloud import firestore
    db = firestore.Client()
    doc_ref = db.collection(u'data').document(u'one')

    doc_ref.update({
        u'arrayExample': u'DataflowRunner',
        u'booleanExample': True
    })
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform google-cloud-dataflow google-cloud-firestore

1
推荐指数
1
解决办法
2141
查看次数

使用maven,为pom添加依赖并下载它?

这是一个非常简单的问题,但我搜索了一个解决方案,没有什么对我有用.我将以下内容添加到我的pom.xml中,但无法导入org.apache.commons.csv.CSVParser我的class.当我运行mvn install时,我得到以下输出.

<dependencies> 
      ....

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-csv</artifactId>
        <version>1.1.1-SNAPSHOT</version>
    </dependency>

    ....
 </dependencies>
Run Code Online (Sandbox Code Playgroud)

D:\java\my_project>mvn install    
[INFO] Scanning for projects...    
[INFO] Building my_project 1.0-SNAPSHOT

[WARNING] The POM for org.apache.commons:commons-csv:jar:1.1.1-SNAPSHOT is missing, no dependency information available

[INFO] BUILD FAILURE    

[INFO] Total time: 0.644 s    
[INFO] Finished at: 2015-07-21T08:46:47-04:00    
[INFO] Final Memory: 8M/245M    

[ERROR] Failed to execute goal on project my_project: Could not resolve depen dencies for project com.myproject:artifact:jar:1.0-SNAPSHOT: Could not find artifact org.apache.commons:commons-csv:jar:1.1.1-SNAPSHOT -> [Help 1]  
[ERROR] To see the …
Run Code Online (Sandbox Code Playgroud)

java maven

-1
推荐指数
1
解决办法
850
查看次数