在我的Flink代码中,我正在流式传输一个位于HDFS文件夹的文件,我收到错误"(没有这样的文件或目录)",但我确定文件名和地址是正确的,因为我在批处理中使用了相同的方法和每件事都顺利进行.有谁知道可能是什么问题?这是我的代码:
DataStream<FebrlObject> myStream =
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));
Run Code Online (Sandbox Code Playgroud)
及其相关课程:
public class MyObjectGenerator implements SourceFunction<MyObject> {
private String dataFilePath;
private float servingSpeedFactor;
private Integer rowNo ;
private transient BufferedReader reader;
private transient InputStream inputStream;
public MyObjectGenerator(String dataFilePath) {
this(dataFilePath, 1.0f);
}
public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
this.dataFilePath = dataFilePath;
this.servingSpeedFactor = servingSpeedFactor;
rowNo = 0 ;
}
@Override
public void run(SourceContext<MyObject> sourceContext) throws Exception {
long servingStartTime = Calendar.getInstance().getTimeInMillis();
inputStream = new DataInputStream(new FileInputStream(dataFilePath));
reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
long dataStartTime;
rowNo++;
if (reader.ready() && (line = reader.readLine()) != null ) {
MyObject myObject = MyObject.fromString(line);
if (febrlObject!= null )
sourceContext.collect(myObject);
} else {
return;
}
while (reader.ready() && (line = reader.readLine()) != null) {
MyObject myObject = MyObject.fromString(line);
sourceContext.collect( febrlObject );
}
this.reader.close();
this.reader = null;
this.inputStream.close();
this.inputStream = null;
}
@Override
public void cancel() {
try {
if (this.reader != null) {
this.reader.close();
}
if( this.inputStream != null) {
this.inputStream.close();
}
} catch (IOException ioe) {
//
} finally {
this.reader = null;
this.inputStream = null;
}
}
}
Run Code Online (Sandbox Code Playgroud)
您尝试使用Java常规访问HDFS中的文件FileInputStream.FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink's以FileInputFormat`为例.
但是,如果可能的话,我会尽量避免自己实现.您可以尝试使用Flink FileInputFormat来读取文件行(返回a DataStream<String>)和解析行的连续(平面)映射器.
| 归档时间: |
|
| 查看次数: |
808 次 |
| 最近记录: |