import pyhdfs from pyhdfs import HdfsException import os import telnetlib  as tn from flask import Flask,request,Response from gevent.pywsgi import WSGIServer import werkzeug  ''' python 操作HDFS。 1.访问文件系统列表(目录、文件) FileStatus(accessTime=0,             blockSize=0,             childrenNum=15,             fileId=16395,             group='supergroup',             length=0,             modificationTime=1572431013356,             owner='hdfs',             pathSuffix='.cloudera_health_monitoring_canary_files',             permission='0',             replication=0,             storagePolicy=0,             type='DIRECTORY'/'FILE') 2.读取文件内容 3.下载HDFS文件到本地 '''  namenodes = 'ip:9870,ip:9870' file_type = 'FILE' dir_type = 'DIRECTORY'  app = Flask(__name__)  @app.route('/uploadBytes',methods=['POST']) def upload_bytes():     method = request.method     if method != 'GET' and method != 'POST':         return Response({0: '请使用GET或POST请求!'})     # 获取request请求信息     # args = request.args #args为路径中的参数。     file = request.files['file']     dicts = request.form.to_dict()     dir = dicts['dirPath']     nodes = dicts['nameNodes']     filename = dicts['fileName']     # 上传到hdfs     myhdfs = MyHdfs(nodes)     result = myhdfs.upload_to_hdfs2(file, filename, dir)     return Response(str(result))  @app.route('/upload',methods=['POST']) def upload():     method = request.method     if  method != 'GET' and method != 'POST':         return Response({0:'请使用GET或POST请求!'})     #获取request请求信息     #args = request.args #args为路径中的参数。     file = request.files['file']     dicts = request.form.to_dict()     dir = dicts['dirPath']     nodes = dicts['nameNodes']     #上传到hdfs     myhdfs = MyHdfs(nodes)     result =  myhdfs.upload_to_hdfs(file,dir)     return Response(str(result))  @app.route('/download',methods=['GET']) def download():     if request.method != 'GET':         return Response({0:'请使用GET请求!'})     args = request.args     nodes = args.get("nameNodes")     file = args.get("filePath")     # 连接到hdfs     myhdfs = MyHdfs(nodes)     result = myhdfs.get_hdfs_filestream(file)     if type(result) == dict:         return Response(str(result))     return Response(result) @app.route('/read',methods=['GET']) def readfile():     if request.method != 'GET':         return Response({0:'请使用GET请求!'})     args = request.args     nodes = args.get("nameNodes")     file = args.get("filePath")     lines = args.get("readLines")     # 连接到hdfs     myhdfs = MyHdfs(nodes)     result = myhdfs.read_hdfs_file(file,lines)     return Response(str(result))  @app.route('/delete',methods=['GET']) def delfile():     if request.method != 'GET':         return Response({0:'请使用GET请求!'})     args = request.args     nodes = args.get("nameNodes")     file = args.get("filePath")     # 连接到hdfs     myhdfs = MyHdfs(nodes)     result = myhdfs.delete_file(file)     return Response(str(result))  class PathInfo(object):     """存储路径(目录、文件)信息"""     def __init__(self, name, pathInfos=[]):         #pathInfo自身的路径名称         self.name = name         #pathInfo下一级的路径名称         self.pathInfos = pathInfos     def __str__(self):         return "{'name':'%s','sub':%s}"%(self.name,self.pathInfos)  class MyHdfsException(HdfsException):     def __init__(self, value="请检查主机、端口的正确性!"):         self.value = value     def raise_exception(self):         try:             raise MyHdfsException         finally:             print(self.value)  class MyHdfs(object):     def __init__(self, hosts,user_name='hdfs'):         self.hosts = hosts         self.username = user_name         self.fs = pyhdfs.HdfsClient(self.hosts,self.username)         self.__check_server_available()      def __check_server_available(self):         if len(self.hosts.strip())==0:             MyHdfsException().raise_exception()         try:             host_list = str(self.hosts).split(",")             for host in host_list:                 ip,port = host.split(":")                 tn.Telnet(ip, port)         except:             MyHdfsException().raise_exception()         else:             print('IP和端口可用!')     def upload_to_hdfs(self,data,dirPath="/"):         """         上传文件到HDFS,默认上传到HDFS根目录。         :param data 需要上传的数据。(``bytes`` or a ``file``-like object)         :param dirPath HDFS目录。(可以是已经存在的目录,也可以是不存在的)         :return: 上传成功1,上传失败0         """         try:             # 获取文件名             filename = data.filename             print("filename: ", filename)             #拼接文件路径             filePath = ''             if dirPath.endswith("/"):                 filePath = dirPath + filename             else:                 filePath = dirPath + '/' + filename             #检查文件是否存在             if self.fs.exists(filePath):                 print("文件已存在!")                 return {0:"文件已存在!"}             else:                 # 创建文件                 print("开始上传文件...")                 self.fs.create(filePath, data, overwrite=False, buffersize=2048)                 print("上传文件完成...")                 return {1:"成功上传文件!"}         except:             #MyHdfsException("上传文件失败!").raise_exception()             print("上传文件失败!")             return {0:"上传文件失败!"}     def upload_to_hdfs2(self,data,filename,dirPath="/"):         """         上传文件到HDFS,默认上传到HDFS根目录。         :param data 需要上传的数据。(``bytes`` or a ``file``-like object)         :param fileName 文件名         :param dirPath HDFS目录。(可以是已经存在的目录,也可以是不存在的)         :return: 上传成功1,上传失败0         """         try:             #获取文件名             print("filename: ", filename)             #拼接文件路径             filePath = ''             if dirPath.endswith("/"):                 filePath = dirPath + filename             else:                 filePath = dirPath + '/' + filename             #检查文件是否存在             if self.fs.exists(filePath):                 print("文件已存在!")                 return {0:"文件已存在!"}             else:                 # 创建文件                 print("开始上传文件...")                 self.fs.create(filePath, data, overwrite=False,buffersize=2048)                 print("上传文件完成...")                 return {1:"成功上传文件!"}         except:             #MyHdfsException("上传文件失败!").raise_exception()             print("上传文件失败!")             return {0:"上传文件失败!"}      def get_hdfs_filestream(self,filePath):         """         获取指定文件的IO流         :param filePath:  HDFS文件的路径         :return: 1表示获取文件流成功;0表示有问题         """         try:             check_result = self.__check_hdfs_path(filePath,file_type)             if not check_result:                 return {0: "请指定文件路径!"}             # 返回文件流:urllib3.response.HTTPResponse             return self.fs.open(filePath)         except MyHdfsException:             return {0:"文件路径不存在!"}      def download_to_local(self,hdfsSource,localDest):         """         将HDFS上的文件下载到本地指定目录         :param hdfsSource: HDFS上的文件路径         :param localDest: 本地文件/目录路径         ###param over_write:是否覆盖已经存在的文件         :return: 提示信息,或下载成功的文件数量n或失败0         """         # 判断是否是文件         if self.__check_hdfs_path(hdfsSource,file_type):             #检查本地目录:如果不存在,就创建             localDest,localDest_type = self.check_local_path(localDest)             # 直接下载             if localDest_type == dir_type:                 idx = hdfsSource.rindex("/")                 localDest = localDest + "/" + hdfsSource[idx:]             self.fs.copy_to_local(hdfsSource, localDest)             return 1         else:             print("HDFS路径必须是文件路径!")             return 0      def __check_hdfs_path(self,hdfsPath,isFileOrDir):         """         判断给定的HDFS是否存在,如果存在,是文件还是目录。         :param hdfsPath:  路径         :param isFileOrDir: 文件或目录,取值为'DIRECTORY'或'FILE'         :return:         """         if (not self.fs.exists(hdfsPath)):             MyHdfsException("路径(%s)不存在!" % (hdfsPath)).raise_exception()         file_status = self.fs.get_file_status(hdfsPath)         return file_status.type == isFileOrDir      def check_local_path(self,givenpath):         """         判断文件或目录是否存在,不存在就创建目录         :param givenpath:         :return: 返回给定的givenpath是文件还是目录         """         os_path = os.path         #当前位置的名称         base_name = os_path.basename(givenpath)         #上一级路径         dir_name = os_path.dirname(givenpath)         existFlag = os_path.exists(givenpath)         #如果givenpath是以斜杠结尾,就去掉斜杠         if givenpath.endswith("//") :             idx = str(givenpath).rindex("//")             givenpath = str(givenpath)[:idx]         if givenpath.endswith("/") :             idx = str(givenpath).rindex("/")             givenpath = str(givenpath)[:idx]         if base_name.find(".") == -1:             # 如果是目录且不存在,就创建             if not existFlag:                 # 创建目录                 os.makedirs(givenpath)             return givenpath,dir_type         else:             #如果是文件就检查上一级目录是否创建             flag = os_path.exists(dir_name)             if not flag:                 os.makedirs(dir_name)             return givenpath,file_type      def delete_file(self,filePath):         """         删除指定的HDFS文件。         :param filePath: 文件路径         :return: 1表示删除成功,0删除失败。         """         try:             check_result = self.__check_hdfs_path(filePath,file_type)             if not check_result:                 return {0: "请指定文件路径!"}         except MyHdfsException:             return {0:"文件路径不存在!"}         #true if delete is successful else false.         del_result = self.fs.delete(filePath)         if del_result:             return {1:"删除成功"}         else:             return {0:"删除失败"}      def read_hdfs_file(self,path,lines=100):         """         读取给定的HDFS文件         :param path: 文件路径         :param lines:需要读取的行数         :return: 1表示成功读取到的字节流序列。0表示读取有问题。         """         if (not self.fs.exists(path)):             #raise RuntimeError("路径(%s)不存在!" % (path))             return {0:"路径(%s)不存在!" % (path)}         file_status = self.fs.get_file_status(path)         #判断是否是文件         if (file_status.type==file_type):             resp = self.fs.open(path) #urllib3.response.HTTPResponse             #resp.flush()             line_bytes = resp.readline() #bytes             #记录读取的行数             lines = int(lines)             lines -= 1             list_bytes = []             while len(line_bytes)>0 and lines>=0:                  #bytes==>str                 list_bytes.append(line_bytes.decode("ISO-8859-1").strip())                 line_bytes = resp.readline()                 lines -= 1             resp.close()             return {1:list_bytes}         else:             #raise RuntimeError("请输入文件路径!")             return {0:"请输入文件路径!"}      def get_subpath(self,pathInfo=PathInfo('/',[]),recursive=False):         """         (循环)遍历FS中指定路径下的内容:文件或目录         :param path: 封装路径信息的对象         :param recursive: 是否需要循环遍历完给定路径下所有的子路径         :return:路径信息         """         self_path = pathInfo.name         sub_pathes = pathInfo.pathInfos         if(not self.fs.exists(self_path)):             return "路径(%s)不存在!"%(self_path)         statuses = self.fs.list_status(self_path)         if (not recursive):             # path.pathInfos = fs.listdir(self_path)             if (self_path == "/"):                 pathInfo.pathInfos = [self_path+i for i in self.fs.listdir(self_path)]             else:                 pathInfo.pathInfos = [self_path +"/" + i for i in self.fs.listdir(self_path)]         else:             for status in statuses:                 currentPathSuffix = status.pathSuffix                 # 转成详细的路径                 if (self_path == "/"):                     currentPathSuffix = self_path + currentPathSuffix                 else:                     currentPathSuffix = self_path + "/" + currentPathSuffix                 if (status.type == dir_type):                     # 如果是目录,就封装到对象并继续遍历                     tmp = PathInfo(currentPathSuffix, [])                     sub_pathes.append(tmp)                     self.get_subpath(tmp, recursive)                 else:                     # 如果是文件,就直接返回                     sub_pathes.append(currentPathSuffix)                 pathInfo.pathInfos = sub_pathes  if __name__ == '__main__':     # result = PathInfo('/', [])     # get_subpath(result,True)     #print(result)     #path = "/user/yarn/mapreduce/mr-framework"     #path = "/testdata"     #hdfspath="/testdata/2013_trip_data_test.csv" #     # #path="/testdata/北京大兴机场.jpg"     # list = [i.decode("utf-8") for i in read_hdfs_file(hdfspath,3)]     # print(list)      #download_to_local(path,"d:/tg/gaohan.csv")     #path="d:/datas/gh/tg"     #print(download_to_local(hdfspath, path))      # myhdfs = MyHdfs(namenodes)     # myhdfs.download_to_local(hdfspath, path)     #app.run()     http_server = WSGIServer(("0.0.0.0",5010),app)     http_server.serve_forever()