我正在尝试对将数据写入S3,然后从同一S3位置读取相同数据的函数进行单元测试。我正在尝试使用moto和boto(2.x)来实现[1]。问题是该服务返回我被禁止访问密钥[2]的信息。moto github存储库中报告了类似的问题(即使错误消息有所不同)[3],但尚未解决。
有没有人成功地在PySpark中测试过模拟的s3读写,以分享一些见解?
[1]
import boto
from boto.s3.key import Key
from moto import mock_s3
_test_bucket = 'test-bucket'
_test_key = 'data.csv'
@pytest.fixture(scope='function')
def spark_context(request):
conf = SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'test-access-key-id')
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'test-secret-access-key')
request.addfinalizer(lambda: sc.stop())
quiet_py4j(sc)
return sc
spark_test = pytest.mark.usefixtures("spark_context")
@spark_test
@mock_s3
def test_tsv_read_from_and_write_to_s3(spark_context):
spark = SQLContext(spark_context)
s3_conn = boto.connect_s3()
s3_bucket = s3_conn.create_bucket(_test_bucket)
k = Key(s3_bucket)
k.key = _test_key
k.set_contents_from_string('')
s3_uri = 's3n://{}/{}'.format(_test_bucket, _test_key)
df = (spark
.read
.csv(s3_uri))
Run Code Online (Sandbox Code Playgroud)
[2]
(...)
E py4j.protocol.Py4JJavaError: An error …Run Code Online (Sandbox Code Playgroud) 我有一个csv文件[1],我想直接加载到数据集中.问题是我总是得到错误
org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate
The type path of the target object is:
- field (class: "scala.Float", name: "probability")
- root class: "TFPredictionFormat"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
Run Code Online (Sandbox Code Playgroud)
而且,特别是对于该phrases领域(检查案例类[2]),它得到了
org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true);
Run Code Online (Sandbox Code Playgroud)
如果我将我的case类[2]中的所有字段定义为String类型,那么一切正常,但这不是我想要的.有没有一种简单的方法可以做到[3]?
参考
[1]一个示例行
B017NX63A2,Merrell,"['merrell_for_men', 'merrell_mens_shoes', 'merrel']",merrell_shoes,0.0806054356579781
Run Code Online (Sandbox Code Playgroud)
[2]我的代码片段如下 …
我有一个包含可以包含整数值的数组类型列的数据框。如果没有值,它将只包含一个值,它将是空值
重要提示:请注意该列不会为空,而是具有单个值的数组;空值
> val df: DataFrame = Seq(("foo", Seq(Some(2), Some(3))), ("bar", Seq(None))).toDF("k", "v")
df: org.apache.spark.sql.DataFrame = [k: string, v: array<int>]
> df.show()
+---+------+
| k| v|
+---+------+
|foo|[2, 3]|
|bar|[null]|
Run Code Online (Sandbox Code Playgroud)
问题:我想获取具有空值的行。
到目前为止我尝试过的:
> df.filter(array_contains(df("v"), 2)).show()
+---+------+
| k| v|
+---+------+
|foo|[2, 3]|
+---+------+
Run Code Online (Sandbox Code Playgroud)
对于 null,它似乎不起作用
> df.filter(array_contains(df("v"), null)).show()
Run Code Online (Sandbox Code Playgroud)
org.apache.spark.sql.AnalysisException:
v由于数据类型不匹配而无法解析 'array_contains( , NULL)': Null 类型值不能用作参数;
或者
> df.filter(array_contains(df("v"), None)).show()
Run Code Online (Sandbox Code Playgroud)
java.lang.RuntimeException: 不支持的文字类型类 scala.None$ 无
我有一个包含一列数组字符串的Dataframe A.
...
|-- browse: array (nullable = true)
| |-- element: string (containsNull = true)
...
Run Code Online (Sandbox Code Playgroud)
例如,三个样本行
+---------+--------+---------+
| column 1| browse| column n|
+---------+--------+---------+
| foo1| [X,Y,Z]| bar1|
| foo2| [K,L]| bar2|
| foo3| [M]| bar3|
Run Code Online (Sandbox Code Playgroud)
另一个包含一列字符串的Dataframe B.
|-- browsenodeid: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
它的一些样本行
+------------+
|browsenodeid|
+------------+
| A|
| Z|
| M|
Run Code Online (Sandbox Code Playgroud)
如何过滤A以便保留browse包含browsenodeidB 中任何值的所有行?就上述例子而言,结果将是:
+---------+--=-----+---------+
| column 1| browse| column n|
+---------+--------+---------+
| foo1| [X,Y,Z]| bar1| <- because Z is a value …Run Code Online (Sandbox Code Playgroud) 嗨,大家好,我有一个函数可以从S3的某些位置加载数据集并返回有趣的数据
private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
// pick rows for the given marketplaces
.where($"mid".isin(mids: _*))
// pick rows for the given indices
.where($"index".isin(indices: _*))
Run Code Online (Sandbox Code Playgroud)
}
如果有人提供mids = Seq()或,此实现将过滤掉所有内容indices = Seq()。另一方面,我希望语义是“仅在mids不为空的情况下应用此where子句”(与相同indices),这样,如果函数的用户提供空序列,则不会进行过滤。
有没有很好的功能方法可以做到这一点?
scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset