Python Revisited Day 07 (文件处理)
[TOC]
《Python 3 程序開發指南》 學習筆記
import datetimeclass IncidentError(Exception): passclass Incident:"""Incident是存儲航空事故的數據類型>>> kwargs = dict(report_id="2007061289X")>>> kwargs["date"] = datetime.date(2007, 6, 12)>>> kwargs["airport"] = "Los Angeles">>> kwargs["aircraft_id"] = "8184XK">>> kwargs["aircraft_type"] = "CVS91">>> kwargs["pilot_percent_hours_on_type"] = 17.5>>> kwargs["pilot_total_hours"] = 1258>>> kwargs["midair"] = False>>> incident = Incident(**kwargs)>>> incident.report_id, incident.date, incident.airport('2007061289X', datetime.date(2007, 6, 12), 'Los Angeles')>>> incident.aircraft_id, incident.aircraft_type('8184XK', 'CVS91')>>> incident.pilot_percent_hours_on_type17.5>>> incident.pilot_total_hours, incident.midair(1258, False)>>> incident.midair = 1Traceback (most recent call last):...AssertionError: invalid midair>>> incident.pilot_percent_hours_on_type = -1Traceback (most recent call last):...AssertionError: invalid pilot_percent_hours_on_type"""def __init__(self, report_id, date, airport, aircraft_id,aircraft_type, pilot_percent_hours_on_type,pilot_total_hours, midair, narrative=""):""":param report_id: str Minimum length 8 and no whitespace:param date: datetime.date:param airport: str Nonempty and no newlines:param aircraft_id: str Nonempty and no newlines:param aircraft_type: str Nonempty and no newlines:param pilot_percent_hours_on_type: float Range 0.0 to 100.0:param pilot_total_hours: int Positive and nonzero:param midair: bool:param narrative: str Multiline"""assert len(report_id) >= 8 and len(report_id.split()) == 1, \"invalid report ID"self.__report_id = report_idself.date = dateself.airport = airportself.aircraft_id = aircraft_idself.aircraft_type = aircraft_typeself.pilot_percent_hours_on_type = pilot_percent_hours_on_typeself.pilot_total_hours = pilot_total_hoursself.midair = midairself.narrative = narrative@propertydef report_id(self):return self.__report_id@propertydef date(self):return self.__date@date.setterdef date(self, date):assert isinstance(date, datetime.date), "invalid date"self.__date = date@propertydef airport(self):return self.__airport@airport.setterdef airport(self, airport):assert airport and '\n' not in airport, "invalid airport"self.__airport = airport@propertydef aircraft_id(self):return self.__aircraft_id@aircraft_id.setterdef aircraft_id(self, aircraft_id):assert aircraft_id and "\n" not in aircraft_id, \"invalid aircraft_id"self.__aircraft_id = aircraft_id@propertydef aircraft_type(self):return self.__aircraft_type@aircraft_type.setterdef aircraft_type(self, aircraft_type):assert aircraft_type and "\n" not in aircraft_type, \"invalid aircraft_type"self.__aircraft_type = aircraft_type@propertydef pilot_percent_hours_on_type(self):return self.__pilot_percent_hours_on_type@pilot_percent_hours_on_type.setterdef pilot_percent_hours_on_type(self, pilot_percent_hours_on_type):assert 0.0 <= pilot_percent_hours_on_type <= 100.0, \"invalid pilot_percent_hours_on_type"self.__pilot_percent_hours_on_type = pilot_percent_hours_on_type@propertydef midair(self):"""Whether the incident involved another aircraft"""return self.__midair@midair.setterdef midair(self, midair):assert isinstance(midair, bool), "invalid midair"self.__midair = midair@propertydef narrative(self):"""The incident's narrative"""return self.__narrative@narrative.setterdef narrative(self, narrative):#assert "\n" in narrative, "invalid narrative"self.__narrative = narrativeclass IncidentCollection(dict):"""存放事故集IncidentCollection繼承自dict我們改寫了一些方法,使得其返回值以keys的排序為基"""def values(self):for report_id in self.keys():yield self[report_id]def items(self):for report_id in self.keys():yield (report_id, self[report_id])def __iter__(self):for report_id in sorted(super().keys()):#不用super()就無限遞歸啦yield report_idkeys = __iter__if __name__ == "__main__":import doctestdoctest.testmod()7.1 二進制數據的讀與寫
7.1.1 帶可選壓縮的Pickle
pickle沒有安全機制,因此,加載來自不可信源的pickle可能是危險的。 pickle可以導入任意模塊并調用任意函數,因此來自不可信源的Pickle中的數據可能會被惡意操縱。
#self是一個dict,字典的值是Incident對象(pickle可以自動處理自定義類的對象)def export_pickle(self, filename, compress=False):"""事故數據保存pickle"""fh = Nonetry:if compress:fh = gzip.open(filename, "wb") #如果要求壓縮,用gzip打開else:fh = open(filename, "wb")pickle.dump(self, fh, pickle.HIGHEST_PROTOCOL)return Trueexcept (EnvironmentError, pickle.PicklingError) as err:print("{0}: export error:{1}".format(os.path.basename(sys.argv[0]),err))return Falsefinally:if fh is not None:fh.close() GZIP_MAGIC = b"\x1F\x8B" #gzip壓縮的文件的魔數def import_pickle(self, filename):"""載入函數GZIP_MAGIC 魔數 == b"\x1F\x8B""""fh = Nonetry:fh = open(filename, "rb")magic = fh.read(len(GZIP_MAGIC))if magic == GZIP_MAGIC:fh.close()fh = gzip.open(filename, "rb")else:fh.seek(0) #把文件的指針撥回到初始位置self.clear()self.update(pickle.load(fh))return Trueexcept (EnvironmentError, pickle.UnpicklingError) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))finally:if fh is not None:fh.close()可pickled 的類型
布爾型、數值型以及字符串都可以pickled,類(包括自定義類)的實例也可以pickled,前提是其私有的__dict__是picklable。此外,內置的組合類型也能pickled。
bytes and bytearray (表)
bytes 屬于不變 bytearray不然
b1 = "小剛的藍色水筆" b2 = b1.encode() b2 #b'\xe5\xb0\x8f\xe5\x88\x9a\xe7\x9a\x84\xe8\x93\x9d\xe8\x89\xb2\xe6\xb0\xb4\xe7\xac\x94' b2.decode() # "小剛的藍色水筆" b1 = "小剛的藍色水筆" b2 = bytearray(b1.encode()) for i in b2:print(type(i), i, hex(i)) b2 """從這里就可以明白那一堆東西是啥了 就是16進制編碼?""" <class 'int'> 229 0xe5 <class 'int'> 176 0xb0 <class 'int'> 143 0x8f <class 'int'> 229 0xe5 <class 'int'> 136 0x88 <class 'int'> 154 0x9a <class 'int'> 231 0xe7 <class 'int'> 154 0x9a <class 'int'> 132 0x84 <class 'int'> 232 0xe8 <class 'int'> 147 0x93 <class 'int'> 157 0x9d <class 'int'> 232 0xe8 <class 'int'> 137 0x89 <class 'int'> 178 0xb2 <class 'int'> 230 0xe6 <class 'int'> 176 0xb0 <class 'int'> 180 0xb4 <class 'int'> 231 0xe7 <class 'int'> 172 0xac <class 'int'> 148 0x94 bytearray(b'\xe5\xb0\x8f\xe5\x88\x9a\xe7\x9a\x84\xe8\x93\x9d\xe8\x89\xb2\xe6\xb0\xb4\xe7\xac\x94') """漢字是3個8位?""" b1 = "小剛的藍色水筆" b2 = bytearray(b1.encode()) b2_1 = "小紅".encode() b2_2 = "粉色".encode() b2[:6] = b2_1 b2[9:15] = b2_2 b2.decode() #“小紅的粉色水筆” b = "ABCDE".encode() b1= "A".encode() b[0], b[:1] # (65, b'A') b[0] == b1 # False b[0] == b1[0] # True b[:1] == b1 # True| ba.append(i) | 將整數i(0~255)附加到bytearray ba中 |
| b.capitalize() | 返回bytes/bytearray b 的副本,并且第一個字符變為大寫(如果是一個ASCII字符) |
| b.center(width, byte) | 返回b的副本,b在長度位width的區域中間,并使用空格或給定的byte填充 |
| b.count(x, start, end) | 返回bytes/bytearray x在bytes/bytearray b(或切片)中出現的次數 |
| b.decode(encoding, error) | 返回一個str對象,代表使用UTF-8編碼表示的(或指定encoding表示并根據可選的error參數進行錯誤處理)字節 |
| b.endwith(x, start, end) | 如果b(或b的start:end分片)以bytes/bytearray x或元組x中任意bytes/bytearray結尾,就返回True,否則返回False |
| b.expandtabs(size) | 返回bytes/bytearray b的副本,并且其中的制表符使用空格(個數為8的倍數,或指定的size)替代 |
| ba.extend(seq) | 使用序列seq中的所有ints對bytearray ba進行擴展,所有ints必須在0到255之間 |
| b.find(x, start, end) | 返回bytes/bytearray x在b(或b的start:end分片)中最左邊的位置,如果沒有找到,就返回-1.使用rfind()可以找到最右邊的位置 |
| b.fromhex(h) | 返回一個bytes對象,其字節對應的是str h中的十六進制整數 |
| b.index(x, start, end) | 返回x在b(或b的start:end分片)中最左邊的位置,如果沒找到,就產生ValueError異常。使用rindex()方法可以找到最右邊的位置 |
| ba.insert(p, i) | 將整數i(取值范圍0到255)插入到ba中的位置p處 |
| b.isalnum() | 如果bytes/bytearray b 非空,并且b中的每個字符都是ASCII字母數字字符就返回True |
| b.isalpha() | 如果bytes/bytearray b 非空,并且b中的每個字符都是ASCII字母字符,就返回True |
| b.isdigit() | ...ASCII數字... |
| b.islower() | 如果bytes/bytearray b包含至少一個可小寫的ASCII字符,并且其所有可小寫的字符都是小寫的,就返回True |
| b.isspace() | 如果bytes/bytearray b非空,并且b中的每個字符都是ASCII空格字符,就返回True |
| b.istitle() | 如果b是非空并且首字母大寫的,就返回True |
| b.isupper() | 如果bytes/bytearray b包含至少一個可大寫的ASCII字符,并且所有可大寫的字符都是小寫的,就返回True |
| b.join(seq) | 返回序列seq中每個bytes/bytearray 進行連接后所得的結果,并在每倆個之間添加一個b(可以為空) |
| b.ljust(width, byte) | 返回byte/bytearray b 的副本,并且要求左對齊,長度為width, 使用空格或給定的byte(可選的)進行填充。使用rjust()方法可以右對齊 |
| b.lower() | 返回bytes/bytearray b的副本,其中ASCII字符都為小寫 |
| b.partition(sep) | 返回一個元組,其中包含3個bytes對象——包括b的最左邊bytes/bytearray seq之前的那部分、seq本身和b中seq之后的那部分;如果b中不包含sep,就返回b以及倆個為空的bytes對象。使用rpartition()方法可以在sep的最右邊出現處進行分割。 |
| ba.pop(p) | 移除并返回ba中索引位置p處的整數 |
| ba.remove(i) | 從bytearray ba 中移除整數i的首次出現 |
| b.replace(x, y, n) | 返回b的一個副本,其中bytes.bytearray x的每個(或最多n個,如果給定)出現都用y進行替代 |
| ba.reverse() | 反轉bytearray ba的字節 |
| b.split(x, n) | 返回一個字節列表,在x處進行分割(至多n次), 如果沒有給定n,就在可能的地方都進行分割;如果沒有給定x,就在空白字符處進行分割。使用rsplit()可以從右邊開始分割 |
| b.splitlines(f) | 返回對b進行分割(在行終結符處)后產生的行列表,如果f不為True,就剝離掉行終結符 |
| b.startswith(x, start, end) | 如果bytes/bytearray b(或b的start:end分片)以bytes/bytearrays)引導,就返回True,否則返回False |
| b.strip(x) | 返回b的副本,并剝離掉開始與結尾處的空白字符(或bytes/bytearray x中的字節), lstrip()只剝離起始處,rstrip()只剝離結尾處的 |
| b.title() | 返回b的副本,其中每個字的第一個ASCII字符都是大寫的,其他所有ASCII字符則都是小寫的 |
| b.translate(bt, d) | 返回b的一個副本,其中不包括來自d的字節,并且每個字節都被bytes bt的相應字節替換 |
| b.upper() | 返回bytes/bytearray b的副本,其中ASCII字符都變為大寫 |
| b.zfill(w) | 返回b的副本,如果長度小于w,就使用引導字符(0x30)進行填充,使其長度為w |
7.1.2 帶可選壓縮的原始二進制數據
Little-endian, Big-endian | 小端,大端,低位,高位
點這里 點這里 點這里
7.2 文本文件的寫入與分析
7.2.1 寫入文本
def export_text(self, filename):"""輸出文本,使用了textwrap模塊:param filename::return:"""wrapper = textwrap.TextWrapper(initial_indent=" ",subsequent_indent=" ")fh = Nonetry:fh = open(filename, "w", encoding="utf8")for incident in self.values():narrative = "\n".join(wrapper.wrap(incident.narrative.strip()))fh.write("[{0.report_id}]\n""date={0.date!s}\n""aircraft_id={0.aircraft_id}\n""aircraft_type={0.aircraft_type}\n""airport={airport}\n""pilot_percent_hours_on_type=""{0.pilot_percent_hours_on_type}\n""pilot_total_hours={0.pilot_total_hours}\n""midair={0.midair:d}\n"".NARRATIVE_START.\n{narrative}\n"".NARRATIVE_END.\n\n".format(incident,airport=incident.airport.strip(),narrative=narrative))return Trueexcept EnvironmentError as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))finally:if fh is not None:fh.close()7.2.2 分析文本
def import_text_manual(self, filename):"""讀入文本:param filename::return:"""fh = Nonetry:fh = open(filename, encoding="utf8")self.clear()data = {}narrative=Nonefor lino, line in enumerate(fh, start=1):line = line.rstrip()#清除尾部空白字符,如果是空行就相當于跳過if not line and narrative is None:continueif narrative is not None: #說明已經讀入到結尾了if line == ".NARRATIVE_END.":data["narrative"] = textwrap.dedent(narrative).strip()if len(data) != 9: #說明讀入數據有缺失raise IncidentError("missing data on""line {0}".format(lino))incident = Incident(**data)self[incident.report_id] = incidentdata = {}narrative = Noneelse:narrative += line + "\n"elif (not data and line[0] == "[" #說明讀入的是idand line[-1] == "]"):data["report_id"] = line[1:-1]elif "=" in line: #說明讀入的是incident信息key, value = line.split("=", 1)if key == "date":data[key] = datetime.datetime.strptime(value,"%Y-%m-%d").date()elif key == "pilot_percent_hours_on_type":data[key] = float(value)elif key == "pilot_total_hours":data[key] = int(value)elif key == "midair":data[key] = bool(int(value))else:data[key] = valueelif line == ".NARRATIVE_START.":narrative = ""else:raise KeyError("parsing error on line {0}".format(lino))return Trueexcept (EnvironmentError, ValueError, KeyError,IncidentError) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))finally:if fh is not None:fh.close()7.2.3 使用正則表達式分析文本
def import_text_regex(self, filename):"""使用正則表達式分析文本:param filename: :return: """incident_re = re.compile(r"\[(?P<id>[^]]+)\](?P<keyvalues>.+?)" #id IDr"^\.NARRATIVE_START\.$(?P<narrative>.*?)"#keyvalues 中間的信息r"^\.NARRATIVE_END\.$", #narrative narrativere.DOTALL|re.MULTILINE #標記)key_value_re = re.compile(r"^\s*(?P<key>[^=]+)\s*=\s*" r"(?P<value>.+)\s*$",re.MULTILINE)fh = Nonetry:fh = open(filename, encoding="utf8")self.clear()for incident_match in incident_re.finditer(fh.read()):data = {}data["report_id"] = incident_match.group("id")data["narrative"] = textwrap.dedent(incident_match.group("narrative")).strip()keyvalues = incident_match.group("keyvalues")for match in key_value_re.finditer(keyvalues):data[match.group("key")] = match.group("value")data["date"] = datetime.datetime.strptime(data["date"], "%Y-%m-%d").date()data["pilot_percent_hours_on_type"] = float(data["pilot_percent_hours_on_type"])data["pilot_total_hours"] = int(data["pilot_total_hours"])data["midair"] = bool(int(data["midair"]))if len(data) != 9: #如果長度不足9說明數據缺失raise IncidentError("missing data")incident = Incident(data)self[incident.report_id] = incidentreturn Trueexcept (EnvironmentError, KeyError, ValueError, IncidentError) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))finally:if fh is not None:fh.close()7.3 寫入與分析XML文件
XML和JSON數據格式對比(左側導航很好玩)
<?xml version="1.0" encoding="UTF-8"?> <incidents> <incident report_id="20070222008009G", date="2007-02-22"aircraft_id="880342" aircraft_type="CE-172-M"pilot_percent_hours_on_type="9.09090909091"pilot_total_hour="448" midair="0"> <airport>BOWERMAN</airport> <narrative> On A GO-AROUND FROM A NIGHT CROSSWIND LANDING ATTEMPT THE AIRCRAFT HIT A RUN WAY EDGE LIGHT DAMAGING ONE PROPELLER. </narrative> </incident> <incident> ... </incident> <incident> ... </incident> <incident> ... </incident>7.3.1元素樹
#import xmldef export_xml_etree(self, filename):"""使用元素樹寫入XML文件"""root = xml.etree.ElementTree.Element("incidents")for incident in self.values():element = xml.etree.ElementTree.Element("incident",report_id=incident.report_id,date=incident.date.isoformat(),aircraft_id=incident.aircraft_id,aircraft_type=incident.aicraft_type,pilot_percent_hours_on_type=str(incident.pilot_percent_hours_on_type),pilot_total_hours=str(incident.pilot_total_hours),midair=str(int(incident.midair)))airport = xml.etree.ElementTree.SubElement(element, "airport")airport.text = incident.airport.strip()narrative = xml.etree.ElementTree.SubElement(element, "narrative")narrative.text = incident.narrative.strip()root.append(element)tree = xml.etree.ElementTree.ElementTree(root)try:tree.write(filename, "UTF-8")except EnvironmentError as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))return True def import_xml_tree(self, filename):"""利用元素樹讀入xml文件"""try:tree = xml.etree.ElementTree.parse(filename)except (EnvironmentError,xml.parsers.expat.Expaterror) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))self.clear()for element in tree.finall("incident"):try:data = {}for attribute in ("report_id", "date", "aircraft_id","aircraft_type","pilot_percent_hours_on_type","pilot_total_hours", "midair"):data[attribute] = element.get(attribute)data["date"] = datetime.datetime.strptime(data["date"], "%Y-%m-%d").date()data["pilot_percent_hours_on_type"] = (float(data["pilot_percent_hours_on_type"]))data["pilot_total_hours"] = (int(data["pilot_total_hours"]))data["midair"] = bool(int(data["midair"]))data["airport"] = element.find("airport").text.strip()narrative = element.fiond("narrative").textdata["narrative"] = (narrative.strip()if narrative is not None else "")incident = Incident(**data)self[incident.report_id] = incidentexcept (ValueError, LookupError, IncidentError) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))return Falsereturn True7.3.2 DOM
def export_xml_dom(self, filename):"""使用DOM寫入xml"""dom = xml.dom.minidom.getDOMimplementation()tree = dom.createDocument(None, "incidents", None)root = tree.documentElement#取回根節點for incident in self.values():element = tree.createElement("incident")for attribute, value in (("report_id", incident.report_id),("date", incident.date.isoformat()),("aircraft_id", incident.aircraft_id),("aircraft_type", incident.aircraft_type),("pilot_percent_hours_on_type", str(incident.pilot_percent_hours_on_type)),("pilot_total_hours",str(incident.pilot_total_hours)),("midair", str(int(incident.midair)))):element.setAttribute(attribute, value)for name, text in (("airport", incident.airport),("narrative", incident.narrative)):text_element = tree.createTextNode(text)name_element = tree.createElement(name)name_element.appendChild(text_element)element.appendChild(name_element)root.appendChild(element)fh = Nonetry:fh = open(filename, "w", encoding="utf8")tree.writexml(fh, encodeing="UTF-8")return Trueexcept EnvironmentError as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))finally:if fh is not None:fh.close() def import_xml_dom(self, filename):"""使用Dom讀入xml"""def get_text(node_list):"""處理子節點文檔"""text = []for node in node_list:if node.nodeType == node.TEXT_NODE:text.append(node.data)return "".join(text).strip()try:dom = xml.dom.minidom.parse(filename)except (EnvironmentError,xml.parsers.expat.Expaterror) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))return Falseself.clear()for element in dom.getElementsByTagName("incident"):try:data = {}for attribute in ("report_id", "date", "aircraft_id","aircraft_type","pilot_percent_hours_on_type","pilot_total_hours", "midair"):data[attribute] = element.getAttribute(attribute)data["date"] = datetime.datetime.strptime(data["date"], "%Y-%m-%d").date()data["pilot_percent_hours_on_type"] = \float(data["pilot_percent_hours_on_type"])data["pilot_total_hours"] = \float(data["pilot_total_hours"])data["midair"] = bool(int(data["midair"]))airport = element.getElementByTagName("airport")[0]data["airport"] = get_text(airport.childNodes)narrative = element.getElementByTagName("narrative")[0]data["narrative"] = get_text(narrative.childNodes)incident = Incident(**data)self[incident.report_id] = incidentexcept (ValueError, LookupError, IncidentError) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))return Falsereturn True7.3.3 手動寫入XML
def export_xml_manual(self, filename):"""手動寫入xml"""fh = Nonetry:fh = open(filename, "w", encoding="utf8")fh.write('<?xml version="1.0" encoding="UTF-8"?>\n')fh.write("<incidents>\n")for incident in self.values():fh.write('<incident report_id={report_id}''date="{0.date!s}"''aircraft_id={aircraft_id}''aircraft_type={aircraft_type}''pilot_percent_hours_on_type=''"{0.pilot_percent_hours_on_type}"''pilot_total_hours="{0.pilot_total_hours}"''midair="{0.midair:d}">\n''<airport>{airport}</airport>\n''<narrative>\n{narrative}\n</narrative>\n''</incident>\n'.format(incident,report_id=xml.sax.saxutils.quoteattr(incident.report_id),aircraft_id=xml.sax.saxutils.quoteattr(incident.aircraft_id),aircraft_type=xml.sax.saxutils.quoteattr(incident.aircraft_type),#好像對引號也轉義airport=xml.sax.saxutils.escape(incident.airpot), #對 & < >等進行轉義好像對引號不轉義narrative="\n".join(textwrap.wrap(xml.sax.saxutils.escape(incident.narrative.strip()), 70))))fh.write("</incidents>\n")return Trueexcept EnvironmentError as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))return Falsefinally:if fh is not None:fh.close()7.3.4 使用SAX分析XML
與元素樹和DOM在內存種表示整個XML文檔不同的是,SAX分析其是逐步讀入并處理的,從而可能更快,對內存的需求也不那么明顯。然而,性能的優勢不能僅靠假設,尤其是元素樹與DOM都是用了快速的expat分析器。
class IncidentSaxHandler(xml.sax.handler.ContentHandler):def __init__(self, incidents):super().__init__()#對所有子類而言,這是一種好的做法self.__data = {}self.__text = ""self.__incidents = incidents #相當于只拷貝對象引用self.__incidents.clear() #所以需要清空def startElement(self, name, attributes):"""處理開始標簽讀取到開始標簽的時,都會以標簽名一起標簽屬性作為參數來調用xml.sax.handler.ContentHandler.startElement()方法,當然,在這里我們對此進行了改寫 """if name == "incident":self.__data = {}for key, value in attributes.items():if key == "date":self.__data[key] = datetime.datetime.strptime(value, "%Y-%m-%d").date()elif key == "pilot_percent_hours_on_type":self.__data[key] = float(value)elif key == "pilot_total_hours":self.__data[key] = int(value)elif key == "midair":self.__data[key] = bool(int(value))else:self.__data[key] = valueself.__text = ""def endElement(self, name):"""讀取到結束標簽時,將調用xml.sax.handler.ContentHandler.endElement()方法,當然我們對此進行了改寫:param name: :return: """if name == "incident":if len(self.__data) != 9:raise IncidentError("missing data")incident = Incident(**self.__data)self.__incidents[incident.report_id] = incidentelif name in frozenset({"airport", "narrative"}):self.__data[name] = self.__text.strip()self.text = ""def characters(self, text):"""讀取到文本時,SAX分析器將調用xml.sax.handler.ContentHandler.characters()方法:param text: :return: """self.__text += text def import_xml_sax(self, filename):"""使用SAX來分析xml"""fh = Nonetry:handler = IncidentSaxHandler(self) #文本分析器parser = xml.sax.make_parser()parser.setContentHandle(handler)#添加文本分析器parser.parse(filename)return Trueexcept (EnvironmentError, ValueError, IncidentError,xml.sax.SAXParseException) as err:print("{0}: import error: {1}".format(os.path.basename(sys.argv[0]),err))return False7.4 隨機存取二進制文件
前面幾節中,工作的基礎是程序的所有數據都是作為一個整體讀入內容,進行適當處理,最后在作為整體鞋出。有些情況下,將數據放在磁盤上,并只讀入需要的部分,處理之后再將變化的部分寫回磁盤,這是一種更好的解決方案。
7.4.1 通用的BinaryRecordFile類
BinaryRecoordFile.BinaryRecoordFile類的API類似于列表,因為我們可以獲取/設置/刪除給定的索引位置的記錄。記錄被刪除后,只是簡單地標記為“已刪除”,這使得我們不必移動該記錄后面地所有記錄來保證連續性,也意味著刪除操作之后,所有原始的索引位置仍然是有效的。另一個好處是,只要取消“已刪除”標記,就可以反刪除一條記錄。當然,這種方法即便刪除了記錄,也仍然不能節省任何磁盤空間。為既覺這一問題,我們將提供適當的方法來“壓縮”文件,移除已刪除的記錄(并使得改索引位置無效)。
Contact = struct.Struct("<15si") contacts = BinartRecordFile.BinaryRecordFile(filename, Contact.size)我們以"<15si"格式(小端字節順序,一個15字節的字節字符串,一個4字節的有符號整數)創建了一個結構,用于表示每條記錄。 之后創建了了一個BinartRecordFile.BinaryRecordFile實例,并使用一個文件名和一個記錄大小做參數。如果文件不存在,會自動創建文件。
contacts[4] = Contact.pack("Abe Baker".encode("utf8"), 762) contacts[5] = Contack.pack("Cindy Dove"encode("utf8), 987)上面的操作對文件相應地方進行了改寫。如果,索引位置處前沒有記錄,就用0x00字節填充,"Abe Baker"少于15個字節,后面也用0x00填充
文件對象屬性與方法(表)
| f.close() | 關閉文件對象f,并將屬性f.closed設置為True |
| f.closed | 文件已關閉,則返回True |
| f.encoding | bytes與str之間進行轉換時使用的編碼 |
| f.fileno() | 返回底層文件的文件描述符(只對那些有文件描述符的文件對象是有用的) |
| f.flush() | 清空文件對象f,這個翻譯有毒啊,應該是刷新當前緩沖區,讓緩沖區的內容立馬寫入文件,而無需等待。所以,上面加flush()的作用,應該就是如果設置為True,每一次寫入后,都立馬再從緩沖區寫入文件,而不是等待。 |
| f.isatty() | 如果文件對象與控制臺關聯,就返回True(只有在文件對象應用了真正的文件時才是可用的) |
| f.mode | 文件對象打開時使用的模式, 只讀 |
| f. name | 文件對象f的文件名(如果有) |
| f.newlines | 文本文件f中的換行字符串類型 |
| f._next_() | 返回文件對象f的下一行 |
| f.peek(n) | 返回n個字節,而不移動文件指針的位置 |
| f.read(count) | 從文件對象f中讀取至多count個字節,如果沒有指定count,就讀取從當前文件指針直至最后的每個字節。以二進制模式讀時,返回bytes對象,以文本模式讀時,返回str對象。如果沒有要讀的內容(以到文件結尾),就返回一個空的bytes或str對象 |
| f.readable() | 如果f已經打開等待讀取,就返回True |
| f.readinto(ba) | 將至多len(ba)個字節讀入到bytearray ba中,并返回讀入的字節數——如果在文件結尾,就為0(只有在二進制模式下才可用) |
| f.readline(count) | 讀取下一行(如果指定count,并且在\n字符之前滿足這一數值,那么至多讀入count個字節),包括\n |
| f.readlines(sizehint) | 讀入到文件結尾之前的所有行,并以列表形式返回。如果給定sizehint,那么讀入大概至多sizehint個字節(如果底層文件支持) |
| f.seek(offset, whence) | 如果沒有給定whence,或其為os.SEEK_SET,就按給定的offset(相對于文件起始點)移動文件指針(并作為下一次讀、寫的起點);如果whence為os.SEEK_CUR,就相當于當前文件指針位置將其移動offset(可以為負值)個(wgence為os.SEEK_END,則是相對于文件結尾)。在追加模式"a"下,寫入總是在結尾處進行的,而不管文件指針在何處。在文本模式下,只應該使用tell()方法的返回值作為offset |
| f.seekable() | 如果f支持隨機存取,就返回True |
| f.tell() | 返回當前指針位置(相對于文件起始處) |
| f.truncate(size) | 截取文件到當前文件指針所在位置,如果給定size,就到size大小處 |
| f.writable() | 如果f是為寫操作而打開的,就返回True |
| f.write(s) | 將bytes/bytearray對象s寫入文件(該文件以二進制模式打開),或將str對象s寫入到文件(該文件以文本模式打開) |
| f.writelines(seq) | 將對象序列(對文本文件而言是字符串,對二進制文件而言是字節字符串)寫入到文件 |
7.4.2 實例: BikeStock模塊的類
import struct from practice import BinaryRecordFileclass Bike:def __init__(self, identity, name, quantity, price):assert len(identity) > 3, ("invalid bike identity {0}".format(identity))self.__identity = identity #自行車IDself.name = nameself.quantity = quantity #自行車數量self.price = price #自行車單價@propertydef identity(self):return self.__identity@propertydef value(self):return self.quantity * self.price_BIKE_STRUCT = struct.Struct("<8s30sid")def _bike_from_record(record):ID, NAME, QUANTITY, PRICE = range(4)parts = list(_BIKE_STRUCT.upack(record))parts[ID] = parts[ID].decode("utf8").rstrip("\x00")parts[NAME] = parts[NAME].decode("utf8").rstrip("\x00")return Bike(*parts)def _record_from_bike(bike):return _BIKE_STRUCT.pack(bike.identity.encode("utf8"),bike.name.encode("utf8"),bike.quantity, bike.price)class BikeStock:def __init__(self, filename):self.__file = BinaryRecordFile.BinaryRecordFile(filename,_BIKE_STRUCT.size)self.__index_from_identity = {}for index in range(len(self.__file)):record = self.__file[index]if record is not None:bike = _bike_from_record(record)self.__index_from_identity[bike.identity] = indexdef append(self, bike):index = len(self.__file)self.__file[index] = _record_from_bike(bike)self.__index_from_identity[bike.bike.identity] = indexdef __delitem__(self, identity):del self.__file[self.__index_from_identity[identity]]def __getitem__(self, identity):record = self.__file[self.__index_from_identity[identity]]return None if record is None else _bike_from_record(record)def __change_stock(self, identity, amount):index = self.__index_from_identity[identity]record = self.__file[index]if record is None:return Falsebike = _bike_from_record(record)bike.quantity += amountself.__file[index] = _record_from_bike(bike)return Trueincrease_stock = (lambda self, identity, amount:self.__change_stock(identity, amount))decrease_stock = (lambda self, identity, amount:self.__change_stock(identity, -amount))def __change_name(self, identity, name):index = self.__index_from_identity[identity]record = self.__file[index]if record is None:return Falsebike = _bike_from_record(record)bike.name = nameself.__file[index] = _record_from_bike(bike)return Truedef __change_price(self, identity, price):index = self.__index_from_identity[identity]record = self.__file[index]if record is None:return Falsebike = _bike_from_record(record)bike.price = priceself.__file[index] = _record_from_bike(bike)return Truedef __iter__(self):for index in range(len(self.__file)):record = self.__file[index]if record is not None:yield _bike_from_record(record)轉載于:https://www.cnblogs.com/MTandHJ/p/10544417.html
總結
以上是生活随笔為你收集整理的Python Revisited Day 07 (文件处理)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: PHP正则表达式大全
- 下一篇: AD17无法复制原理图到Word的解决方