小编Ben*_*son的帖子

在没有 Hadoop Path API 的情况下读取本地 Parquet 文件

我正在尝试读取本地 Parquet 文件,但是我能找到的唯一 API 与 Hadoop 紧密耦合,并且需要 HadoopPath作为输入(即使是指向本地文件)。

ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(file).build();
GenericRecord nextRecord = reader.read();
Run Code Online (Sandbox Code Playgroud)

如何在独立的 Java 代码中读取镶木地板文件的最流行的答案,但需要一个 HadoopPath并且现在已被弃用为一个神秘的InputFile代替。InputFile我能找到的唯一实现是HadoopInputFile,所以再次没有帮助。

在 Avro 这很简单:

DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
this.dataFileReader = new DataFileReader<>(file, datumReader);
Run Code Online (Sandbox Code Playgroud)

(文件在哪里java.io.File)。什么是 Parquet 等价物?

Path在答案中要求没有 Hadoop依赖,因为 Hadoop 会导致膨胀和 jar 地狱,并且需要它来读取本地文件似乎很愚蠢。

为了进一步解释背景故事,我维护了一个小的IntelliJ 插件,它允许用户将 Avro 文件拖放到一个窗格中以便在表格中查看。这个插件目前是 5MB。如果我包含 Parquet 和 Hadoop 依赖项,它会膨胀到超过 50MB,甚至无法工作


回复后附录

现在我已经开始工作了(感谢接受的答案),这是我的工作解决方案,它避免了严重依赖 Hadoop PathAPI可能会拖入的所有烦人的错误:

java hadoop parquet

9
推荐指数
1
解决办法
3862
查看次数

AWS Kinesis - 如何从上一个检查点恢复消费

我正在使用 KCL (v2) 将 Kafka 消费者转换为 AWS Kinesis 消费者。在 Kafka 中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的 Kafka 应用程序死掉,它将使用偏移量从它重新启动时停止的地方开始消费。

然而,这在 Kinesis 中并不相同。我可以设置,kinesisClientLibConfiguration.withInitialPositionInStream(...)但唯一的参数是TRIM_HORIZON,LATESTAT_TIMESTAMP。如果我的 Kinesis 应用程序死掉,它在重新启动时将不知道从哪里恢复消费。

我的 KCL 消费者非常简单。该main()方法看起来像:

KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
            "testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

Worker worker = new Worker.Builder()
            .recordProcessorFactory(new KCLRecordProcessorFactory())
            .config(config)
            .build();
Run Code Online (Sandbox Code Playgroud)

RecordProcessor是一个简单的实现:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", …
Run Code Online (Sandbox Code Playgroud)

java amazon-web-services amazon-kinesis

7
推荐指数
1
解决办法
1894
查看次数

保留配置offsets.retention.minutes和log.retention.minutes之间的区别

以下两种保留配置有什么区别?

  • offsets.retention.minutes
  • log.retention.minutes

我不明白它是如何相互关联或相互关联的.根据我的理解,一旦移除了偏移量,就无法访​​问日志中的记录,反之亦然.有什么我误解了吗?

apache-kafka retention

6
推荐指数
1
解决办法
3042
查看次数

用Java读取ORC文件

你如何用Java读取ORC文件?我想在一个小文件中读取一些单元测试输出验证,但我找不到解决方案.

java hadoop orc

5
推荐指数
1
解决办法
7551
查看次数

将 PowerMock 与 Cucumber 结合使用

我编写了一个 JUnit 测试,它使用 Mockito 和 PowerMock 来模拟一些类。我正在尝试将其转换为 Cucumber 测试,但静态 PowerMock 功能不起作用。

两个相关 Cucumber 类的摘录:

赛跑者

@RunWith(Cucumber.class)
public class JWTValidatorBDDTest {
}
Run Code Online (Sandbox Code Playgroud)

步骤类

public class JWTValidatorCukeTest {
String tokenValue;
JWTValidator jwtValidator;
MockHttpServletRequest mockRequest;

@Before
public void before() throws IOException {
    this.mockRequest = new MockHttpServletRequest();
    PowerMockito.mockStatic(JWTAuthConnectionManager.class);
    BDDMockito.given(JWTAuthConnectionManager.postToken(anyString(), anyString(), anyString())).willReturn(200);
    Mockito.doReturn(200).when(JWTAuthConnectionManager.postToken(anyString(), anyString(), anyString()));
}

@Given("^a JWT token with the value (.*)")
public void a_JWT_token_with_the_value_(String token) {
    this.jwtValidator = new JWTValidator("https://test.7uj67hgfh.com/openam", "Authorization", "Bearer");
    this.tokenValue = token;
}
Run Code Online (Sandbox Code Playgroud)

虽然这段代码在 JUnit 测试中工作,但它在这里失败了 - 它进入JWTAuthConnectionManager.postToken()应该被模拟的方法,然后通过在那里执行代码而失败。我试过添加以下几行: …

java junit unit-testing cucumber powermock

5
推荐指数
1
解决办法
2789
查看次数

Docker Compose hostname命令不起作用

我无法让Docker Compose hostname命令工作.

我正在运行一个简单的docker-compose.yml:

version: '3'
services:
  redis1:
    image: "redis:alpine"
    hostname: redis1host
  redis2:
    image: "redis:alpine"
    hostname: redis2host
Run Code Online (Sandbox Code Playgroud)

一旦我运行它docker-compose up,我应该能够运行docker-compose exec redis1 /bin/ash,然后ping redis2host与其他Redis容器交谈,但ping只是没有到达目的地.我可以ping其他Redis容器ping redis2.

ping redishost2 应该工作,不是吗?

docker docker-compose

3
推荐指数
1
解决办法
1795
查看次数

对象数组上的 Array.reduce() ——返回单个字母的字符串

我正在努力理解 JavaScript 中的 Array.reduce() 。我有一个对象数组,我试图对其应用 .reduce() ,但我得到了一个单字母字符串数组。

目标:

["Stuff", "necklace", "ring", "bracelet"]
Run Code Online (Sandbox Code Playgroud)

当前对象数组

const productArray =
[
    {
        id: 1,
        productTitle: "Necklace"
    },
    {
        id: 2,
        productTitle: "Ring"
    },
    {
        id: 3,
        productTitle: "Bracelet"
    }
]
Run Code Online (Sandbox Code Playgroud)

函数调用

const newStuff = productArray.reduce(function(a, currentValue) {
    return [...a, ...currentValue.productTitle];
}, ["Stuff"])
Run Code Online (Sandbox Code Playgroud)

实际结果: 奇怪的数组

我需要做什么来指定我不希望将“productTitle”分解为单字母字符串?我一直在寻找有关对象数组上的 .reduce() 的资源,但没有发现任何非常有帮助的内容。有什么指点吗?

javascript arrays string object

2
推荐指数
1
解决办法
7015
查看次数

将 Parquet/Avro GenericRecord 写入 JSON,同时维护 LogicalTypes

我正在尝试将一些包含 LogicalTypes 的 Parquet 记录写入 JSON。我通过以下方式执行此操作AvroParquetReader,这给了我一个 Avro GenericRecord

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    record.toString();
}
Run Code Online (Sandbox Code Playgroud)

record.toString()产生:

{"universe_member_id": 94639, "member_from_dt": 2001-08-31T00:00:00Z, "member_to_dt": 2200-01-01T00:00:00Z}
Run Code Online (Sandbox Code Playgroud)

请注意,这是无效的 JSON - 日期已根据其正确转换LogicalType,但没有用引号引起来。

所以我尝试了JsonEncoder

GenericData.get().addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); //etc
OutputStream stringOutputStream = new StringOutputStream();

try (ParquetReader<GenericRecord> parquetReader =
    AvroParquetReader.<GenericRecord>builder(new LocalInputFile(this.path))
        .withDataModel(GenericData.get())
        .build()) {
    GenericRecord record = parquetReader.read();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
    JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), stringOutputStream);
    writer.write(record, encoder);
    encoder.flush();
} …
Run Code Online (Sandbox Code Playgroud)

java json hadoop avro parquet

2
推荐指数
1
解决办法
3354
查看次数

匹配类型的头/尾模式匹配参数

我正在研究九十九个Scala问题的 P07 :

P07 (**) Flatten a nested list structure.
Example:
scala> flatten(List(List(1, 1), 2, List(3, List(5, 8))))
res0: List[Any] = List(1, 1, 2, 3, 5, 8)
Run Code Online (Sandbox Code Playgroud)

我最初的解决方案是:

def flatten[A](ls : List[A]): List[A] = {
   def flattenRec[A](ls: List[A], flatList: List[A]): List[A] = ls match {
     case Nil => flatList
     case head: List[A] :: tail =>  flattenRec(head ::: flatten(tail), flatList)
     case head :: tail => flattenRec(tail, flatList :+ head)
   }
   flattenRec(ls, List[A]())
 }
Run Code Online (Sandbox Code Playgroud)

但是这不会编译,因为我不允许head在第二个case语句中指定类型.有没有办法让我这样做?

顺便说一句,推荐的解决方案使用flatMap …

scala pattern-matching

1
推荐指数
1
解决办法
106
查看次数