public interface ICsvProductReaderConfigurationFactory
{
Configuration Build();
}
public class CsvProductReaderConfigurationFactory : ICsvProductReaderConfigurationFactory
{
private readonly ClassMap<ProductDto> classMap;
public CsvProductReaderConfigurationFactory(IProductDtoClassMapProvider classMapProvider)
{
classMap = classMapProvider.Get();
}
public Configuration Build()
{
var config = new Configuration
{
Delimiter = "\t",
HasHeaderRecord = true,
IgnoreQuotes = true,
MissingFieldFound = (rows, fieldIndex, readingContext) =>
Log.Warn($"Missing Field Found at line {readingContext.Row}\r\n" +
$"Field at index {fieldIndex} does not exist\r\n" +
$"Raw record: {readingContext.RawRecord}"),
BadDataFound = context =>
Log.Warn($"Bad data found at row {context.Row}\r\n" +
$"Raw …Run Code Online (Sandbox Code Playgroud) dataFrame.coalesce(1).write().save("path")有时仅写入 _SUCCESS 和 ._SUCCESS.crc 文件,即使在非空输入上也没有预期的 *.csv.gzDataFrame
文件保存代码:
private static void writeCsvToDirectory(Dataset<Row> dataFrame, Path directory) {
dataFrame.coalesce(1)
.write()
.format("csv")
.option("header", "true")
.option("delimiter", "\t")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.mode(SaveMode.Overwrite)
.save("file:///" + directory);
}
Run Code Online (Sandbox Code Playgroud)
文件获取代码:
static Path getTemporaryCsvFile(Path directory) throws IOException {
String glob = "*.csv.gz";
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, glob)) {
return stream.iterator().next();
} catch (NoSuchElementException e) {
throw new RuntimeException(getNoSuchElementExceptionMessage(directory, glob), e);
}
}
Run Code Online (Sandbox Code Playgroud)
文件获取错误示例:
java.lang.RuntimeException: directory /tmp/temp5889805853850415940 does not contain a file with glob *.csv.gz. Directory listing:
/tmp/temp5889805853850415940/_SUCCESS,
/tmp/temp5889805853850415940/._SUCCESS.crc …Run Code Online (Sandbox Code Playgroud) 我从 HDFS 请求数据,我想获取从中读取它们的文件的元数据。这将使我能够根据给定时刻的可用数据构建看起来像的报告。
我找到了org.apache.hadoop.fs.FileSystem用于获取所有文件列表的解决方案。我知道分区规则,我可以row -> meta根据收到的列表构建映射。
但这一决定似乎难以实施和支持。也许有更简单的方法来实现相同的结果?
这个问题与已有的问题非常相似。唯一的区别是我在 docker 中运行 Airflow
一步步:
requirements.txtPyCharm 项目docker-compose upModuleNotFoundError我想使用 docker-compose和requirements.txt. PyCharm 解释器和 DAG 执行期间应该可以使用这些依赖项
有没有不需要重建图像的解决方案?