Via*_*nin 15 java hive scala apache-spark
简单来说:
以下代码抛出OverlappingFileLockException:
FileChannel.open(Paths.get("thisfile"), StandardOpenOption.APPEND).tryLock().isValid();
FileChannel.open(Paths.get("thisfile"), StandardOpenOption.APPEND).tryLock()..isShared();
Run Code Online (Sandbox Code Playgroud)
关于我的问题的更多信息: 1.当我使用Hive在Windows操作系统中运行Spark时,它可以正常工作,但每次Spark关闭时,它都无法删除一个临时目录(在此之前的其他临时目录被正确删除)并输出以下异常:
2015-12-11 15:04:36 [Thread-13] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext
2015-12-11 15:04:36 [Thread-13] INFO o.a.spark.util.ShutdownHookManager - Shutdown hook called
2015-12-11 15:04:36 [Thread-13] INFO o.a.spark.util.ShutdownHookManager - Deleting directory C:\Users\MyUser\AppData\Local\Temp\spark-9d564520-5370-4834-9946-ac5af3954032
2015-12-11 15:04:36 [Thread-13] INFO o.a.spark.util.ShutdownHookManager - Deleting directory C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041
2015-12-11 15:04:36 [Thread-13] ERROR o.a.spark.util.ShutdownHookManager - Exception while deleting Spark temp dir: C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041
java.io.IOException: Failed to delete: C:\Users\MyUser\AppData\Local\Temp\spark-42b70530-30d2-41dc-aff5-8d01aba38041
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:884) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:63) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:60) [spark-core_2.11-1.5.0.jar:1.5.0]
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) [scala-library-2.11.6.jar:na]
at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:60) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at scala.util.Try$.apply(Try.scala:191) [scala-library-2.11.6.jar:na]
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216) [spark-core_2.11-1.5.0.jar:1.5.0]
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) [hadoop-common-2.4.1.jar:na]
Run Code Online (Sandbox Code Playgroud)
我尝试在互联网上进行搜索,但发现Spark 正在进行中的问题(一个用户尝试做一些补丁,但是如果我正确地对这个拉取请求进行注释,它就无法正常工作)以及SO中的一些未解答的问题.
看起来问题出在Utils.scala类的deleteRecursively()方法中.我将断点设置为此方法并将其重写为Java:
public class Test {
public static void deleteRecursively(File file) {
if (file != null) {
try {
if (file.isDirectory()) {
for (File child : listFilesSafely(file)) {
deleteRecursively(child);
}
//ShutdownHookManager.removeShutdownDeleteDir(file)
}
} finally {
if (!file.delete()) {
if (file.exists()) {
throw new RuntimeException("Failed to delete: " + file.getAbsolutePath());
}
}
}
}
}
private static List<File> listFilesSafely(File file) {
if (file.exists()) {
File[] files = file.listFiles();
if (files == null) {
throw new RuntimeException("Failed to list files for dir: " + file);
}
return Arrays.asList(files);
} else {
return Collections.emptyList();
}
}
public static void main(String [] arg) {
deleteRecursively(new File("C:\\Users\\MyUser\\AppData\\Local\\Temp\\spark-9ba0bb0c-1e20-455d-bc1f-86c696661ba3"));
}
Run Code Online (Sandbox Code Playgroud)
当Spark在此方法的断点处停止时,我发现Spark的一个线程的JVM 锁定了"C:\ Users\MyUser\AppData\Local\Temp\spark-9ba0bb0c-1e20-455d-bc1f-86c696661ba3\metastore\db .lck"文件和Windows Process Explorer也显示Java锁定此文件.FileChannel也显示该文件在JVM中被锁定.
现在,我必须:
找出哪个线程/类已锁定此文件
找出锁定文件的方法Spark用于锁定"metastore\db.lck",它是什么类以及如何在关机前解锁它
在调用deleteRecursively()方法之前对Spark或Hive执行拉取请求以解锁此文件("metastore\db.lck")或者至少留下关于问题的注释
如果您需要任何其他信息,请在评论中提问.
小智 5
- 如何查找哪个 Java/Scala 线程锁定了文件?
我有一些问题并找到了这个解决方案:您至少可以在 Thread.threadLocals 字段中看到所有锁定的对象。
如果文件锁定以下代码:
File newFile = new File("newFile.lock");
newFile.createNewFile();
FileLock fileLock = FileChannel.open(Paths.get(newFile.getAbsolutePath()), StandardOpenOption.APPEND).tryLock();
Run Code Online (Sandbox Code Playgroud)
在Thread.threadLocals你可以看到sun.nio.fs.NativeBuffer字段owner=“.../newFile.lock”的类。
因此,您可以尝试以下代码,该代码返回所有线程以及 threadLocals 中的所有类,您需要找到哪些线程具有 NativeBuffer 类或 Spark/Hive 对象等(并在 Eclipse 或 IDEA 调试模式下检查该线程的 threadLocals 后):
private static String getThreadsLockFile() {
Set<Thread> threads = Thread.getAllStackTraces().keySet();
StringBuilder builder = new StringBuilder();
for (Thread thread : threads) {
builder.append(getThreadsLockFile(thread));
}
return builder.toString();
}
private static String getThreadsLockFile(Thread thread) {
StringBuffer stringBuffer = new StringBuffer();
try {
Field field = thread.getClass().getDeclaredField("threadLocals");
field.setAccessible(true);
Object map = field.get(thread);
Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
table.setAccessible(true);
Object tbl = table.get(map);
int length = Array.getLength(tbl);
for (int i = 0; i < length; i++) {
try {
Object entry = Array.get(tbl, i);
if (entry != null) {
Field valueField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getDeclaredField("value");
valueField.setAccessible(true);
Object value = valueField.get(entry);
if (value != null) {
stringBuffer.append(thread.getName()).append(" : ").append(value.getClass()).
append(" ").append(value).append("\n");
}
}
} catch (Exception exp) {
// skip, do nothing
}
}
} catch (Exception exp) {
// skip, do nothing
}
return stringBuffer.toString();
}
Run Code Online (Sandbox Code Playgroud)
或者您可以尝试使用以下代码,但此代码仅查找NativeBuffer带参数的类owner(因此并非在所有情况下都有效):
private static String getThreadsLockFile(String fileName) {
Set<Thread> threads = Thread.getAllStackTraces().keySet();
StringBuilder builder = new StringBuilder();
for (Thread thread : threads) {
builder.append(getThreadsLockFile(thread, fileName));
}
return builder.toString();
}
private static String getThreadsLockFile(Thread thread, String fileName) {
StringBuffer stringBuffer = new StringBuffer();
try {
Field field = thread.getClass().getDeclaredField("threadLocals");
field.setAccessible(true);
Object map = field.get(thread);
Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
table.setAccessible(true);
Object tbl = table.get(map);
int length = Array.getLength(tbl);
for (int i = 0; i < length; i++) {
try {
Object entry = Array.get(tbl, i);
if (entry != null) {
Field valueField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getDeclaredField("value");
valueField.setAccessible(true);
Object value = valueField.get(entry);
if (value != null) {
int length1 = Array.getLength(value);
for (int j = 0; j < length1; j++) {
try {
Object entry1 = Array.get(value, j);
Field ownerField = Class.forName("sun.nio.fs.NativeBuffer").getDeclaredField("owner");
ownerField.setAccessible(true);
String owner = ownerField.get(entry1).toString();
if (owner.contains(fileName)) {
stringBuffer.append(thread.getName());
}
} catch (Exception exp) {
// skip, do nothing
}
}
}
}
} catch (Exception exp) {
// skip, do nothing
}
}
} catch (Exception exp) {
// skip, do nothing
}
return stringBuffer.toString();
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1927 次 |
| 最近记录: |