LoveJW 发表于 2014-11-21 16:21:54

在eclipse操作数据到HDFS

本帖最后由 LoveJW 于 2014-11-21 16:23 编辑


我在eclipse里往hdfs的表里追加数据,代码是追加操作,为什么一直报该文件已存在的错误呢??

错误:
org.apache.hadoop.ipc.RemoteException: failed to create file /user/root/uair11/air_info_package_cache_/part-m-00000 for DFSClient_NONMAPREDUCE_1323857387_57 on client 192.168.22.129 because current leaseholder is trying to recreate file.
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2342)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2220)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2453)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2414)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:508)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:320)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:48063)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)

    at org.apache.hadoop.ipc.Client.call(Client.java:1347) ~
    at org.apache.hadoop.ipc.Client.call(Client.java:1300) ~
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) ~
    at com.sun.proxy.$Proxy65.append(Unknown Source) ~
    at sun.reflect.GeneratedMethodAccessor97.invoke(Unknown Source) ~
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) ~
    at java.lang.reflect.Method.invoke(Method.java:597) ~
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) ~
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~
    at com.sun.proxy.$Proxy65.append(Unknown Source) ~
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:245) ~
    at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1480) ~
    at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1520) ~
    at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1508) ~
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:310) ~
    at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:306) ~
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~
    at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:306) ~
    at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1160) ~
    at cn.lds.uAir.transwarp.Impl.HDFSFileImpl.appendDataToFile(HDFSFileImpl.java:49) ~
    at cn.lds.uAir.job.MSAQIJob$DataThread.run(MSAQIJob.java:99)

sstutu 发表于 2014-11-21 17:19:40

LoveJW 发表于 2014-11-21 16:55
大哥,能说说吗??

可以尝试下,不过你先贴出代码来,让我一个个敲出来{:soso_e114:},唉


你的FileSystem fs 初始化改改
你的是这种形式:
FileSystem fs = FileSystem.get(conf);
尝试改成这种格式
FileSystem fs = FileSystem.get(URI.create(hdfs_path), conf);



你的是这种格式:
InputStream in = new BufferedInputStream(new ByteArrayInputStream(data.getByte)));
改成这种格式:
InputStream in = new BufferedInputStream(new FileInputStream(file));//要追加的文件流,file为文件


记得改成文件,而非是字符串

你的是这种格式:
OutputStream out = fs.append(filepath);
改成这种格式:
OutputStream out = fs.append(new Path(hdfs_path));
上面没有经过编译器,直接手写出来的,根据自己的情况改改






sstutu 发表于 2014-11-21 16:35:11

本帖最后由 sstutu 于 2014-11-21 16:36 编辑

楼主两个参数有误,导致系统认为是重新创建文件

String hdfs_path= "hdfs://ip:xx/file/fileuploadFileName";//文件路径
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(hdfs_path), conf);
InputStream in = new BufferedInputStream(new FileInputStream(file));//要追加的文件流,file为文件
OutputStream out = fs.append(new Path(hdfs_path));
IOUtils.copyBytes(in, out, 4096, true);

LoveJW 发表于 2014-11-21 16:37:56

sstutu 发表于 2014-11-21 16:35
本帖最后由 sstutu 于 2014-11-21 16:36 编辑
楼主两个参数有误,导致系统认为是重新创建文件

能详细的说下怎么改吗??

LoveJW 发表于 2014-11-21 16:55:04

sstutu 发表于 2014-11-21 16:35
本帖最后由 sstutu 于 2014-11-21 16:36 编辑
楼主两个参数有误,导致系统认为是重新创建文件

{:soso_e154:}大哥,能说说吗??

LoveJW 发表于 2014-11-21 18:33:47

本帖最后由 LoveJW 于 2014-11-21 18:36 编辑

sstutu 发表于 2014-11-21 17:19
可以尝试下,不过你先贴出代码来,让我一个个敲出来,唉



public boolean appendDataToFile(String table, String data) {
      boolean result = false;
      try {
            if(StringUtils.isBlank(table)||StringUtils.isBlank(data)){
                throw new NullPointerException("appendDataToFile:table or data is null!");
            }
            Path filePath = new Path(host + path + table + "/part-m-00000");
            Configuration conf = new Configuration();
            FileSystem fs = filePath.getFileSystem(conf);
            fs.setReplication(filePath, (short) 3);
            OutputStream out = fs.append(filePath);
            InputStream in = new BufferedInputStream(new ByteArrayInputStream(data.getBytes()));
            IOUtils.copyBytes(in, out, conf);
//            fs.close();
//            IOUtils.closeStream(in);
            result = true;
            

      } catch (Exception e) {
            log.warn("appendDataToFile:data append error",e);
      }   
      return result;
    }

我就是要直接把字符串存过去的啊,Hdfs上有建好的文件,就是要实现追加功能,那两个错误参数是哪两个??



LoveJW 发表于 2014-11-25 15:02:47

sstutu 发表于 2014-11-21 17:19
可以尝试下,不过你先贴出代码来,让我一个个敲出来,唉




上面那个问题能再帮忙看看吗??

desehawk 发表于 2014-11-25 18:58:20

LoveJW 发表于 2014-11-25 15:02
上面那个问题能再帮忙看看吗??




建议改成这种格式
public class PutMerge {


/**
* @param args
*            void
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
   Configuration conf = new Configuration();
      FileSystem hdfs = FileSystem.get(conf);
      FileSystem local = FileSystem.getLocal(conf);
      
      Path inputDir = new Path("/home/du/inout");
      Path hdfsFile = new Path("/myhdfs");
      
      try
      {
            FileStatus[] inputFiles = local.listStatus(inputDir);
            FSDataOutputStream out = hdfs.append(hdfsFile);
            
            for(int i = 0; i < inputFiles.length; i++ )
            {
                System.out.println(inputFiles.getPath().getName());
                FSDataInputStream in = local.open(inputFiles.getPath());
                byte buffer[] = new byte;
                int bytesRead = 0;
                while( (bytesRead = in.read(buffer)) > 0)
                {
                  out.write(buffer, 0, bytesRead);
                }
                in.close();
            }
            out.close();
      }
      catch (IOException e)
      {
            e.printStackTrace();
      }
}


}


desehawk 发表于 2014-11-25 19:00:03

LoveJW 发表于 2014-11-21 18:33
public boolean appendDataToFile(String table, String data) {
      boolean result = false;
    ...



楼主你调试了,确定文件名称已经存在。建议使用简单的已经存在的名字。

LoveJW 发表于 2014-11-25 19:54:48

desehawk 发表于 2014-11-25 19:00
楼主你调试了,确定文件名称已经存在。建议使用简单的已经存在的名字。

确定啊,我写的绝对路径 调试看的路径也对,但是就是报新建错误,我没新建的操作啊
页: [1] 2
查看完整版本: 在eclipse操作数据到HDFS