我收到此错误,但我不知道为什么。基本上我是从此代码错误:
a = data.mapPartitions(helper(locations))
Run Code Online (Sandbox Code Playgroud)
其中数据是RDD,而我的助手定义为:
def helper(iterator, locations):
for x in iterator:
c = locations[x]
yield c
Run Code Online (Sandbox Code Playgroud)
(位置只是数据点的数组)我看不出问题出在哪里,但我也不是pyspark的佼佼者,所以有人可以告诉我为什么我无法从此代码中获得“ PipelinedRDD”对象吗?
我正在尝试编写单元测试,但在存根 AWS S3 getobject 方法时遇到问题。这是我的代码:
describe('testExecuteSuccess()', function () {
it('Test execution with id is successful.', async () => {
let id = "test_id";
const getObjectStub = AWS.S3.prototype.getObject = sinon.stub();
getObjectStub.returns({ promise: () => Promise.resolve({ Body: "test" }) });
await executionHandler.execute(id).then(() => {
getObjectStub.should.have.been.calledOnce;
});
});
});
Run Code Online (Sandbox Code Playgroud)
有谁知道它有什么问题/如何正确存根 getObject 方法?当我运行测试时,我得到InvalidParameterType: Expected params.Bucket to be a string
证明存根不起作用。
这是 executionHandler.execute 方法的代码:
exports.execute = async function(curr_id) {
let params = { Bucket: BUCKET_NAME, Key: KEY_PATH }
let fileObject = await s3.getObject(params).promise();
let …
Run Code Online (Sandbox Code Playgroud)