我有两个 docker 容器在 ubuntu 上运行,其中一个用于 hadoop namenode,另一个用于 hadoop datanode。
现在我在 Windows 上运行我的 java 代码使用 Hadoop FileSystem api 将文件从我的 Windows 文件系统复制到远程 docker hdfs 。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.File;
public class HadoopTest {
@Test
public void testCopyFileToHDFS() throws Exception {
Configuration configuration = new Configuration();
configuration.addResource(getClass().getClassLoader().getResourceAsStream("hadoop/core-site.xml"));
configuration.addResource(getClass().getClassLoader().getResourceAsStream("hadoop/yarn-site.xml"));
FileSystem fileSystem = FileSystem.get(configuration);
FileUtil.copy(new File("c:\\windows-version.txt"),fileSystem, new Path("/testsa"), false,configuration);
}
}
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
16:57:05.669 [Thread-4] DEBUG org.apache.hadoop.hdfs.DFSClient - Connecting to datanode 172.18.0.2:50010
16:57:15.654 [IPC Client (547201549) connection to …Run Code Online (Sandbox Code Playgroud) 例如:
DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
Run Code Online (Sandbox Code Playgroud)
如果上面的reduce转换不是并行操作,我是否需要使用额外的两个转换'partitionByHash'和'mapPartition',如下所示:
DataSet<Tuple1<Long>> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet<Tuple1<Long>> sum = input.partitionByHash(0).mapPartition(new MapPartitionFunction()<Tuple1<Long>,Tuple1<Long>>{
public void map(Iterable<Tuple1<Long>> values,Collector<Tuple1<Long>> out){
long sum = getSum(values);
out.collect(new Tuple1(sum));
}
}).reduce(new ReduceFunction()<Tuple1<Long>,Tuple1<Long>>{
public Tuple1<Long> reduce(Tuple1<Long> value1,Tuple1<Long> value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
Run Code Online (Sandbox Code Playgroud)
以及为什么reduce转换的结果仍然是DataSet的一个实例而不是一个实例 Tuple1<Long>
有一个名为flink-jdbc仅支持非并行元组类型的模块JDBC InputFormat.
为了使用并行InputFormatfor JDBC,似乎需要通过实现接口来定制:org.apache.flink.core.io.InputSplit.
那么在我的情况下,我如何自定义实现JdbcInputSplit从数据库并行查询数据?