我正在尝试运行像https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala这样的示例.我从http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html上的Spark Structured Streaming Programming指南开始.
我的代码是
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more …Run Code Online (Sandbox Code Playgroud) scala sbt sbt-assembly apache-spark spark-structured-streaming
我有一组~36,000个多边形,代表该国的一个分区(〜县).我的python脚本收到很多点:pointId,经度,纬度.
对于每个点,我想发回pointId,polygonId.对于每个点,循环到所有多边形并使用myPoint.within(myPolygon)是非常低效的.
我认为匀称库提供了一种更好的方法来准备多边形,以便找到一个点的多边形成为树路径(国家,地区,子区域......)
到目前为止,这是我的代码:
import sys
import os
import json
import time
import string
import uuid
py_id = str(uuid.uuid4())
sys.stderr.write(py_id + '\n')
sys.stderr.write('point_in_polygon.py V131130a.\n')
sys.stderr.flush()
from shapely.geometry import Point
from shapely.geometry import Polygon
import sys
import json
import string
import uuid
import time
jsonpath='.\cantons.json'
jsonfile = json.loads(open(jsonpath).read())
def find(id, obj):
results = []
def _find_values(id, obj):
try:
for key, value in obj.iteritems():
if key == id:
results.append(value)
elif not isinstance(value, basestring):
_find_values(id, value)
except AttributeError:
pass
try:
for item in …Run Code Online (Sandbox Code Playgroud) apache-spark ×1
graph ×1
optimization ×1
performance ×1
python ×1
sbt ×1
sbt-assembly ×1
scala ×1
shapely ×1