我正在尝试读取本地 Parquet 文件,但是我能找到的唯一 API 与 Hadoop 紧密耦合,并且需要 HadoopPath作为输入(即使是指向本地文件)。
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord nextRecord = reader.read();
Run Code Online (Sandbox Code Playgroud)
是如何在独立的 Java 代码中读取镶木地板文件的最流行的答案?,但需要一个 HadoopPath并且现在已被弃用为一个神秘的InputFile代替。InputFile我能找到的唯一实现是HadoopInputFile,所以再次没有帮助。
在 Avro 这很简单:
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
this.dataFileReader = new DataFileReader<>(file, datumReader);
Run Code Online (Sandbox Code Playgroud)
(文件在哪里java.io.File)。什么是 Parquet 等价物?
我Path在答案中要求没有 Hadoop依赖,因为 Hadoop 会导致膨胀和 jar 地狱,并且需要它来读取本地文件似乎很愚蠢。
为了进一步解释背景故事,我维护了一个小的IntelliJ 插件,它允许用户将 Avro 文件拖放到一个窗格中以便在表格中查看。这个插件目前是 5MB。如果我包含 Parquet 和 Hadoop 依赖项,它会膨胀到超过 50MB,甚至无法工作。
回复后附录
现在我已经开始工作了(感谢接受的答案),这是我的工作解决方案,它避免了严重依赖 Hadoop PathAPI可能会拖入的所有烦人的错误:
我正在使用 KCL (v2) 将 Kafka 消费者转换为 AWS Kinesis 消费者。在 Kafka 中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的 Kafka 应用程序死掉,它将使用偏移量从它重新启动时停止的地方开始消费。
然而,这在 Kinesis 中并不相同。我可以设置,kinesisClientLibConfiguration.withInitialPositionInStream(...)但唯一的参数是TRIM_HORIZON,LATEST或AT_TIMESTAMP。如果我的 Kinesis 应用程序死掉,它在重新启动时将不知道从哪里恢复消费。
我的 KCL 消费者非常简单。该main()方法看起来像:
KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
"testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
Worker worker = new Worker.Builder()
.recordProcessorFactory(new KCLRecordProcessorFactory())
.config(config)
.build();
Run Code Online (Sandbox Code Playgroud)
这RecordProcessor是一个简单的实现:
@Override
public void initialize(InitializationInput initializationInput) {
LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List<Record> records = processRecordsInput.getRecords();
LOGGER.info("Retrieved {} records", …Run Code Online (Sandbox Code Playgroud) 以下两种保留配置有什么区别?
offsets.retention.minuteslog.retention.minutes我不明白它是如何相互关联或相互关联的.根据我的理解,一旦移除了偏移量,就无法访问日志中的记录,反之亦然.有什么我误解了吗?
我编写了一个 JUnit 测试,它使用 Mockito 和 PowerMock 来模拟一些类。我正在尝试将其转换为 Cucumber 测试,但静态 PowerMock 功能不起作用。
两个相关 Cucumber 类的摘录:
赛跑者
@RunWith(Cucumber.class)
public class JWTValidatorBDDTest {
}
Run Code Online (Sandbox Code Playgroud)
步骤类
public class JWTValidatorCukeTest {
String tokenValue;
JWTValidator jwtValidator;
MockHttpServletRequest mockRequest;
@Before
public void before() throws IOException {
this.mockRequest = new MockHttpServletRequest();
PowerMockito.mockStatic(JWTAuthConnectionManager.class);
BDDMockito.given(JWTAuthConnectionManager.postToken(anyString(), anyString(), anyString())).willReturn(200);
Mockito.doReturn(200).when(JWTAuthConnectionManager.postToken(anyString(), anyString(), anyString()));
}
@Given("^a JWT token with the value (.*)")
public void a_JWT_token_with_the_value_(String token) {
this.jwtValidator = new JWTValidator("https://test.7uj67hgfh.com/openam", "Authorization", "Bearer");
this.tokenValue = token;
}
Run Code Online (Sandbox Code Playgroud)
虽然这段代码在 JUnit 测试中工作,但它在这里失败了 - 它进入JWTAuthConnectionManager.postToken()应该被模拟的方法,然后通过在那里执行代码而失败。我试过添加以下几行: …
我无法让Docker Compose hostname命令工作.
我正在运行一个简单的docker-compose.yml:
version: '3'
services:
redis1:
image: "redis:alpine"
hostname: redis1host
redis2:
image: "redis:alpine"
hostname: redis2host
Run Code Online (Sandbox Code Playgroud)
一旦我运行它docker-compose up,我应该能够运行docker-compose exec redis1 /bin/ash,然后ping redis2host与其他Redis容器交谈,但ping只是没有到达目的地.我可以ping其他Redis容器ping redis2.
ping redishost2 应该工作,不是吗?
我正在努力理解 JavaScript 中的 Array.reduce() 。我有一个对象数组,我试图对其应用 .reduce() ,但我得到了一个单字母字符串数组。
目标:
["Stuff", "necklace", "ring", "bracelet"]
Run Code Online (Sandbox Code Playgroud)
当前对象数组
const productArray =
[
{
id: 1,
productTitle: "Necklace"
},
{
id: 2,
productTitle: "Ring"
},
{
id: 3,
productTitle: "Bracelet"
}
]
Run Code Online (Sandbox Code Playgroud)
函数调用
const newStuff = productArray.reduce(function(a, currentValue) {
return [...a, ...currentValue.productTitle];
}, ["Stuff"])
Run Code Online (Sandbox Code Playgroud)
我需要做什么来指定我不希望将“productTitle”分解为单字母字符串?我一直在寻找有关对象数组上的 .reduce() 的资源,但没有发现任何非常有帮助的内容。有什么指点吗?
我正在尝试将一些包含 LogicalTypes 的 Parquet 记录写入 JSON。我通过以下方式执行此操作AvroParquetReader,这给了我一个 Avro GenericRecord:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
record.toString();
}
Run Code Online (Sandbox Code Playgroud)
record.toString()产生:
{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}
Run Code Online (Sandbox Code Playgroud)
请注意,这是无效的 JSON - 日期已根据其正确转换LogicalType,但没有用引号引起来。
所以我尝试了JsonEncoder:
GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();
try (ParquetReader<GenericRecord> parquetReader =
AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
.withDataModel(GenericData.get())
.build()) {
GenericRecord record = parquetReader.read();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
writer.write(record, encoder);
encoder.flush();
} …Run Code Online (Sandbox Code Playgroud) 我正在研究九十九个Scala问题的 P07 :
P07 (**) Flatten a nested list structure.
Example:
scala> flatten(List(List(1, 1), 2, List(3, List(5, 8))))
res0: List[Any] = List(1, 1, 2, 3, 5, 8)
Run Code Online (Sandbox Code Playgroud)
我最初的解决方案是:
def flatten[A](ls : List[A]): List[A] = {
def flattenRec[A](ls: List[A], flatList: List[A]): List[A] = ls match {
case Nil => flatList
case head: List[A] :: tail => flattenRec(head ::: flatten(tail), flatList)
case head :: tail => flattenRec(tail, flatList :+ head)
}
flattenRec(ls, List[A]())
}
Run Code Online (Sandbox Code Playgroud)
但是这不会编译,因为我不允许head在第二个case语句中指定类型.有没有办法让我这样做?
顺便说一句,推荐的解决方案使用flatMap …