为了给backfround我已经加载了JSON
sqlContext.read.json(sn3://...)
df.registerTable("posts")
Run Code Online (Sandbox Code Playgroud)
我在Spark中的表有以下模式
scala> posts.printSchema
root
|-- command: string (nullable = true)
|-- externalId: string (nullable = true)
|-- sourceMap: struct (nullable = true)
| |-- hashtags: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- url: string (nullable = true)
|-- type: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我想用主题标签"nike"选择所有帖子
sqlContext.sql("select sourceMap['hashtags'] as ht from posts where ht.contains('nike')");
Run Code Online (Sandbox Code Playgroud)
我得到一个错误未定义函数ht.contains
我不确定在数组中使用什么方法进行搜索.
谢谢!
我有一个应用程序,其中1.我使用SqlContext.read.json从S3读取JSON文件到Dataframe 2.然后在DataFrame 3上进行一些转换.最后,我想使用记录值之一作为键将记录保存到DynamoDB.其余的JSON参数作为值/列.
我正在尝试这样的事情:
JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", "my-dynamo-table"); // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com");
jobConf.set("dynamodb.regionid", "us-east-1");
jobConf.set("dynamodb.throughput.read", "1");
jobConf.set("dynamodb.throughput.read.percent", "1");
jobConf.set("dynamodb.version", "2011-12-05");
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
DataFrame df = sqlContext.read().json("s3n://mybucket/abc.json");
RDD<String> jsonRDD = df.toJSON();
JavaRDD<String> jsonJavaRDD = jsonRDD.toJavaRDD();
PairFunction<String, Text, DynamoDBItemWritable> keyData = new PairFunction<String, Text, DynamoDBItemWritable>() {
public Tuple2<Text, DynamoDBItemWritable> call(String row) {
DynamoDBItemWritable writeable = new DynamoDBItemWritable();
try {
System.out.println("JSON : " + row);
JSONObject jsonObject = new JSONObject(row);
System.out.println("JSON Object: " …Run Code Online (Sandbox Code Playgroud) amazon-emr amazon-dynamodb apache-spark apache-spark-sql spark-dataframe
我有一个项目,我有一个接口,一个实现相同接口的Abstract类,然后是一组实现此接口并扩展Abstract类的具体类.
public interface Invoice
{
void process();
}
@component
public abstract class AbstractInvoice(){
@Resource
protected Writer writer;
protected validateInvoice(){
//some implementation
}
}
@Component
public Class TypeAInvoice() extends AbstractInvoice implements Invoice{
@Override
public void process(){
//... some code
writer.write();
}
}
public Interface Writer(){
public void write();
}
@Component
public class CDWriter implements Writer{
@Override
public void write() { /* implementation.....*/}
}
Run Code Online (Sandbox Code Playgroud)
Spring文件包含组件扫描.
<context:annotation-config>
<context:component-scan base-package="com.xyz" />
Run Code Online (Sandbox Code Playgroud)
我正在使用工厂来获取TypeAInvoice发票实例现在调用时invoice.process()会获得NPEwrite.write()
我不知道我在这里错过了什么.我试图查看组件扫描和范围,但在概念上找不到任何错误.
我有一个json数据集,格式如下,每行一个条目.
{ "sales_person_name" : "John", "products" : ["apple", "mango", "guava"]}
{ "sales_person_name" : "Tom", "products" : ["mango", "orange"]}
{ "sales_person_name" : "John", "products" : ["apple", "banana"]}
{ "sales_person_name" : "Steve", "products" : ["apple", "mango"]}
{ "sales_person_name" : "Tom", "products" : ["mango", "guava"]}
Run Code Online (Sandbox Code Playgroud)
我想知道谁卖了最大的芒果等等.因此,我想将文件加载到dataframe,并为每个事务的数组中的每个产品值发出(key,value)对(product,name).
var df = spark.read.json("s3n://sales-data.json")
df.printSchema()
root
|-- sales_person_name: string (nullable = true)
|-- products: array (nullable = true)
var nameProductsMap = df.select("sales_person_name", "products").show()
+-----------------+--------------------+
|sales_person_name| products |
+-----------------+--------------------+
| John|[mango, apple,... |
| Tom|[mango, orange,... |
| …Run Code Online (Sandbox Code Playgroud)