PIG:python udf 流式传输错误

Ami*_*mit 5 hadoop apache-pig python-3.x

从 Pig 调用 python udf 时。我收到以下错误(尝试日志)并且在控制台中收到下面提到的错误。无法追踪我做错的地方,因为相同的代码之前已经工作过。在 64 位机器上使用 RHEL(6.4),使用 2.7.2 hadoop 和 0.15 版本的 pig 和 python 3.5

Traceback (most recent call last):
File "/tmp/controller2772959444531928936.py", line 356, in <module>
sys.argv[5], sys.argv[6], sys.argv[7], sys.argv[8])
File "/tmp/controller2772959444531928936.py", line 88, in main
input_str = self.get_next_input()
File "/tmp/controller2772959444531928936.py", line 164, in get_next_input
while input_str.endswith(END_RECORD_DELIM) == False:
TypeError: endswith first arg must be bytes or a tuple of bytes, not str
Run Code Online (Sandbox Code Playgroud)

以下是控制台的错误:

java.lang.Exception: org.apache.pig.impl.streaming.StreamingUDFException: LINE : at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) Caused by: org.apache.pig.impl.streaming.StreamingUDFException: LINE : at org.apache.pig.impl.builtin.StreamingUDF$ProcessErrorThread.run(StreamingUDF.java:503)

还有以下错误:

Exception in thread "Thread-35" java.lang.NullPointerException at org.apache.pig.impl.builtin.StreamingUDF$ProcessOutputThread.run(StreamingUDF.java:468)

以下是Python代码:

@outputSchema('output_field_name:chararray')
def readfileinlist(filename):
    with open(filename) as inputfile:
            lines = inputfile.read().splitlines()
    return lines

@outputSchema('output_field_name:boolean')
 def intlgtinlist(srcgt,destgt,intgtllist):
    if srcgt.startswith(tuple(intgtllist)) or destgt.startswith(tuple(intgtllist)):
            return True
    else:
            return False

@outputSchema('output_field_name:boolean')
def checkintlgtincdrs(aparty,srcgt,destgt):
    intgtllist = []
    try:
            if( (len(srcgt) > 0 or len(destgt) > 0) and (srcgt or destgt) and aparty.isdigit()):
                    if os.path.isfile(INTERNATIONALGTPATH) and os.access(INTERNATIONALGTPATH, os.R_OK) and os.stat(INTERNATIONALGTPATH).st_size > 0:

                            #FUNCTION FOR READING THE FILE IN ARRAY/TUPLE
                            intgtllist = readfileinlist(INTERNATIONALGTPATH)

                            #CHECK FOR THE INPUT(ARG0) IN ARRAY/TUPLE
                            if intlgtinlist(srcgt,destgt,intgtllist):
                                    return True
                            else:
                                    return False
                    else:
                            return False
            else:
                    return False
    except OSError or IndexError:
            pass

    return True
Run Code Online (Sandbox Code Playgroud)

以下是猪脚本

 record = LOAD '/inreport/cdrs/ZTE_20160301*' USING PigStorage('|','-tagFile');
 REGISTER 'udf_smsiuc.py' using streaming_python as smsiucudfs;
 internationalcdrsfilter = FILTER record by smsiucudfs.checkintlgtincdrs($1,$26,$27); 
Run Code Online (Sandbox Code Playgroud)