import pyhdfs from pyhdfs import HdfsException import os import telnetlib as tn from flask import Flask,request,Response from gevent.pywsgi import WSGIServer import werkzeug ''' python 操作HDFS。 1.访问文件系统列表(目录、文件) FileStatus(accessTime=0, blockSize=0, childrenNum=15, fileId=16395, group='supergroup', length=0, modificationTime=1572431013356, owner='hdfs', pathSuffix='.cloudera_health_monitoring_canary_files', permission='0', replication=0, storagePolicy=0, type='DIRECTORY'/'FILE') 2.读取文件内容 3.下载HDFS文件到本地 ''' namenodes = 'ip:9870,ip:9870' file_type = 'FILE' dir_type = 'DIRECTORY' app = Flask(__name__) @app.route('/uploadBytes',methods=['POST']) def upload_bytes(): method = request.method if method != 'GET' and method != 'POST': return Response({0: '请使用GET或POST请求!'}) # 获取request请求信息 # args = request.args #args为路径中的参数。 file = request.files['file'] dicts = request.form.to_dict() dir = dicts['dirPath'] nodes = dicts['nameNodes'] filename = dicts['fileName'] # 上传到hdfs myhdfs = MyHdfs(nodes) result = myhdfs.upload_to_hdfs2(file, filename, dir) return Response(str(result)) @app.route('/upload',methods=['POST']) def upload(): method = request.method if method != 'GET' and method != 'POST': return Response({0:'请使用GET或POST请求!'}) #获取request请求信息 #args = request.args #args为路径中的参数。 file = request.files['file'] dicts = request.form.to_dict() dir = dicts['dirPath'] nodes = dicts['nameNodes'] #上传到hdfs myhdfs = MyHdfs(nodes) result = myhdfs.upload_to_hdfs(file,dir) return Response(str(result)) @app.route('/download',methods=['GET']) def download(): if request.method != 'GET': return Response({0:'请使用GET请求!'}) args = request.args nodes = args.get("nameNodes") file = args.get("filePath") # 连接到hdfs myhdfs = MyHdfs(nodes) result = myhdfs.get_hdfs_filestream(file) if type(result) == dict: return Response(str(result)) return Response(result) @app.route('/read',methods=['GET']) def readfile(): if request.method != 'GET': return Response({0:'请使用GET请求!'}) args = request.args nodes = args.get("nameNodes") file = args.get("filePath") lines = args.get("readLines") # 连接到hdfs myhdfs = MyHdfs(nodes) result = myhdfs.read_hdfs_file(file,lines) return Response(str(result)) @app.route('/delete',methods=['GET']) def delfile(): if request.method != 'GET': return Response({0:'请使用GET请求!'}) args = request.args nodes = args.get("nameNodes") file = args.get("filePath") # 连接到hdfs myhdfs = MyHdfs(nodes) result = myhdfs.delete_file(file) return Response(str(result)) class PathInfo(object): """存储路径(目录、文件)信息""" def __init__(self, name, pathInfos=[]): #pathInfo自身的路径名称 self.name = name #pathInfo下一级的路径名称 self.pathInfos = pathInfos def __str__(self): return "{'name':'%s','sub':%s}"%(self.name,self.pathInfos) class MyHdfsException(HdfsException): def __init__(self, value="请检查主机、端口的正确性!"): self.value = value def raise_exception(self): try: raise MyHdfsException finally: print(self.value) class MyHdfs(object): def __init__(self, hosts,user_name='hdfs'): self.hosts = hosts self.username = user_name self.fs = pyhdfs.HdfsClient(self.hosts,self.username) self.__check_server_available() def __check_server_available(self): if len(self.hosts.strip())==0: MyHdfsException().raise_exception() try: host_list = str(self.hosts).split(",") for host in host_list: ip,port = host.split(":") tn.Telnet(ip, port) except: MyHdfsException().raise_exception() else: print('IP和端口可用!') def upload_to_hdfs(self,data,dirPath="/"): """ 上传文件到HDFS,默认上传到HDFS根目录。 :param data 需要上传的数据。(``bytes`` or a ``file``-like object) :param dirPath HDFS目录。(可以是已经存在的目录,也可以是不存在的) :return: 上传成功1,上传失败0 """ try: # 获取文件名 filename = data.filename print("filename: ", filename) #拼接文件路径 filePath = '' if dirPath.endswith("/"): filePath = dirPath + filename else: filePath = dirPath + '/' + filename #检查文件是否存在 if self.fs.exists(filePath): print("文件已存在!") return {0:"文件已存在!"} else: # 创建文件 print("开始上传文件...") self.fs.create(filePath, data, overwrite=False, buffersize=2048) print("上传文件完成...") return {1:"成功上传文件!"} except: #MyHdfsException("上传文件失败!").raise_exception() print("上传文件失败!") return {0:"上传文件失败!"} def upload_to_hdfs2(self,data,filename,dirPath="/"): """ 上传文件到HDFS,默认上传到HDFS根目录。 :param data 需要上传的数据。(``bytes`` or a ``file``-like object) :param fileName 文件名 :param dirPath HDFS目录。(可以是已经存在的目录,也可以是不存在的) :return: 上传成功1,上传失败0 """ try: #获取文件名 print("filename: ", filename) #拼接文件路径 filePath = '' if dirPath.endswith("/"): filePath = dirPath + filename else: filePath = dirPath + '/' + filename #检查文件是否存在 if self.fs.exists(filePath): print("文件已存在!") return {0:"文件已存在!"} else: # 创建文件 print("开始上传文件...") self.fs.create(filePath, data, overwrite=False,buffersize=2048) print("上传文件完成...") return {1:"成功上传文件!"} except: #MyHdfsException("上传文件失败!").raise_exception() print("上传文件失败!") return {0:"上传文件失败!"} def get_hdfs_filestream(self,filePath): """ 获取指定文件的IO流 :param filePath: HDFS文件的路径 :return: 1表示获取文件流成功;0表示有问题 """ try: check_result = self.__check_hdfs_path(filePath,file_type) if not check_result: return {0: "请指定文件路径!"} # 返回文件流:urllib3.response.HTTPResponse return self.fs.open(filePath) except MyHdfsException: return {0:"文件路径不存在!"} def download_to_local(self,hdfsSource,localDest): """ 将HDFS上的文件下载到本地指定目录 :param hdfsSource: HDFS上的文件路径 :param localDest: 本地文件/目录路径 ###param over_write:是否覆盖已经存在的文件 :return: 提示信息,或下载成功的文件数量n或失败0 """ # 判断是否是文件 if self.__check_hdfs_path(hdfsSource,file_type): #检查本地目录:如果不存在,就创建 localDest,localDest_type = self.check_local_path(localDest) # 直接下载 if localDest_type == dir_type: idx = hdfsSource.rindex("/") localDest = localDest + "/" + hdfsSource[idx:] self.fs.copy_to_local(hdfsSource, localDest) return 1 else: print("HDFS路径必须是文件路径!") return 0 def __check_hdfs_path(self,hdfsPath,isFileOrDir): """ 判断给定的HDFS是否存在,如果存在,是文件还是目录。 :param hdfsPath: 路径 :param isFileOrDir: 文件或目录,取值为'DIRECTORY'或'FILE' :return: """ if (not self.fs.exists(hdfsPath)): MyHdfsException("路径(%s)不存在!" % (hdfsPath)).raise_exception() file_status = self.fs.get_file_status(hdfsPath) return file_status.type == isFileOrDir def check_local_path(self,givenpath): """ 判断文件或目录是否存在,不存在就创建目录 :param givenpath: :return: 返回给定的givenpath是文件还是目录 """ os_path = os.path #当前位置的名称 base_name = os_path.basename(givenpath) #上一级路径 dir_name = os_path.dirname(givenpath) existFlag = os_path.exists(givenpath) #如果givenpath是以斜杠结尾,就去掉斜杠 if givenpath.endswith("//") : idx = str(givenpath).rindex("//") givenpath = str(givenpath)[:idx] if givenpath.endswith("/") : idx = str(givenpath).rindex("/") givenpath = str(givenpath)[:idx] if base_name.find(".") == -1: # 如果是目录且不存在,就创建 if not existFlag: # 创建目录 os.makedirs(givenpath) return givenpath,dir_type else: #如果是文件就检查上一级目录是否创建 flag = os_path.exists(dir_name) if not flag: os.makedirs(dir_name) return givenpath,file_type def delete_file(self,filePath): """ 删除指定的HDFS文件。 :param filePath: 文件路径 :return: 1表示删除成功,0删除失败。 """ try: check_result = self.__check_hdfs_path(filePath,file_type) if not check_result: return {0: "请指定文件路径!"} except MyHdfsException: return {0:"文件路径不存在!"} #true if delete is successful else false. del_result = self.fs.delete(filePath) if del_result: return {1:"删除成功"} else: return {0:"删除失败"} def read_hdfs_file(self,path,lines=100): """ 读取给定的HDFS文件 :param path: 文件路径 :param lines:需要读取的行数 :return: 1表示成功读取到的字节流序列。0表示读取有问题。 """ if (not self.fs.exists(path)): #raise RuntimeError("路径(%s)不存在!" % (path)) return {0:"路径(%s)不存在!" % (path)} file_status = self.fs.get_file_status(path) #判断是否是文件 if (file_status.type==file_type): resp = self.fs.open(path) #urllib3.response.HTTPResponse #resp.flush() line_bytes = resp.readline() #bytes #记录读取的行数 lines = int(lines) lines -= 1 list_bytes = [] while len(line_bytes)>0 and lines>=0: #bytes==>str list_bytes.append(line_bytes.decode("ISO-8859-1").strip()) line_bytes = resp.readline() lines -= 1 resp.close() return {1:list_bytes} else: #raise RuntimeError("请输入文件路径!") return {0:"请输入文件路径!"} def get_subpath(self,pathInfo=PathInfo('/',[]),recursive=False): """ (循环)遍历FS中指定路径下的内容:文件或目录 :param path: 封装路径信息的对象 :param recursive: 是否需要循环遍历完给定路径下所有的子路径 :return:路径信息 """ self_path = pathInfo.name sub_pathes = pathInfo.pathInfos if(not self.fs.exists(self_path)): return "路径(%s)不存在!"%(self_path) statuses = self.fs.list_status(self_path) if (not recursive): # path.pathInfos = fs.listdir(self_path) if (self_path == "/"): pathInfo.pathInfos = [self_path+i for i in self.fs.listdir(self_path)] else: pathInfo.pathInfos = [self_path +"/" + i for i in self.fs.listdir(self_path)] else: for status in statuses: currentPathSuffix = status.pathSuffix # 转成详细的路径 if (self_path == "/"): currentPathSuffix = self_path + currentPathSuffix else: currentPathSuffix = self_path + "/" + currentPathSuffix if (status.type == dir_type): # 如果是目录,就封装到对象并继续遍历 tmp = PathInfo(currentPathSuffix, []) sub_pathes.append(tmp) self.get_subpath(tmp, recursive) else: # 如果是文件,就直接返回 sub_pathes.append(currentPathSuffix) pathInfo.pathInfos = sub_pathes if __name__ == '__main__': # result = PathInfo('/', []) # get_subpath(result,True) #print(result) #path = "/user/yarn/mapreduce/mr-framework" #path = "/testdata" #hdfspath="/testdata/2013_trip_data_test.csv" # # #path="/testdata/北京大兴机场.jpg" # list = [i.decode("utf-8") for i in read_hdfs_file(hdfspath,3)] # print(list) #download_to_local(path,"d:/tg/gaohan.csv") #path="d:/datas/gh/tg" #print(download_to_local(hdfspath, path)) # myhdfs = MyHdfs(namenodes) # myhdfs.download_to_local(hdfspath, path) #app.run() http_server = WSGIServer(("0.0.0.0",5010),app) http_server.serve_forever()
热门文章
- 2月9日免费VPN节点 | 18M/S|2025年Shadowrocket/V2ray/SSR/Clash免费订阅链接地址
- 2月26日免费VPN节点 | 21.8M/S|2025年Shadowrocket/V2ray/SSR/Clash免费订阅链接地址
- 广州市宠物领养平台(广州宠物领养微信群)
- 南京宠物领养中心有哪些地址 南京宠物领养中心有哪些地址和电话
- 做狗粮的机器多少钱一台(狗粮加工厂投资多少钱)
- 合肥宠物救助站电话 合肥宠物救助站电话号码
- 2月13日免费VPN节点 | 18.6M/S|2025年Clash/Shadowrocket/V2ray/SSR免费订阅链接地址
- vue – vue基础/vue核心内容(2)_在线工具
- 免费领养狗狗网站正规有哪些(免费领养狗狗免费领养宠物的网站)
- 2月17日免费VPN节点 | 21.9M/S|2025年SSR/V2ray/Shadowrocket/Clash免费订阅链接地址