Skip to content

Latest commit

 

History

History
1677 lines (1163 loc) · 60.6 KB

File metadata and controls

1677 lines (1163 loc) · 60.6 KB

十、调试技巧

调试一个程序通常和编写它一样困难,有时甚至更困难。很多时候,程序员似乎花费了大量的时间来寻找这个难以捉摸的 bug,其原因可能是盯着他们的脸看,却没有暴露出来。

许多开发人员,即使是优秀的开发人员,也发现故障排除是一门困难的艺术。大多数情况下,当简单的方法(如正确放置的打印语句和策略性注释的代码)能够奏效时,程序员会求助于复杂的调试技术。

Python 在调试代码时会遇到自己的一系列问题。作为一种动态类型化语言,与类型相关的异常在 Python 中非常常见,这些异常是由于程序员假定某个类型是某个类型(当它是另一个类型时)而发生的。名称错误和属性错误也属于类似的类别。

在本章中,我们将专门关注软件的这一较少讨论的方面。

以下是我们将在本章中遇到的主题列表:

  • 最大子阵列问题:

    • “印刷”的力量
    • 分析与重写
    • 计时和优化代码
  • 简单的调试技巧和技巧:

    • 单词搜索程序

    • Word searcher 程序调试步骤 1

    • Word searcher 程序调试步骤 2

    • 单词搜索程序最终代码

    • 跳过代码块

    • 停止执行

    • 使用包装器的外部依赖项

    • 用返回值/数据替换函数(模拟)

    • 将数据作为缓存保存到文件或从文件加载数据

    • 将数据作为缓存保存到内存或从内存加载数据

    • Returning random/mock data

      生成随机患者数据

  • 日志记录作为一种调试技术:

    • 简单应用日志记录

    • Advanced logging—logger objects

      高级日志自定义格式和记录器

      高级日志写入系统日志

  • 使用调试器的调试工具:

    • 与 pdb 的调试会话

    • Pdb—similar tools

      iPdb

      Pdb++

  • 高级调试跟踪:

    • 跟踪模块
    • lptrace 程序
    • 使用 strace 进行系统调用跟踪

好的,让我们调试它!

最大子阵问题

首先,让我们看看一个有趣的问题。在这个问题中,目标是找到一个整数数组(序列)的最大连续子数组,该数组(序列)具有混合的负数和正数。

例如,假设我们有以下数组:

>>> a  = [-5, 20, -10, 30, 15]

通过快速扫描可以很明显地看出,最大和是子阵列[20, -10, 30, 15],给出了55的和。

让我们说,作为第一个切入点,您编写了以下代码:

import itertools

# max_subarray: v1
def max_subarray(sequence):
    """ Find sub-sequence in sequence having maximum sum """

    sums = []

    for i in range(len(sequence)):
        # Create all sub-sequences in given size
        for sub_seq in itertools.combinations(sequence, i):
            # Append sum
            sums.append(sum(sub_seq))

    return max(sums)

现在让我们尝试一下:

>>>  max_subarray([-5, 20, -10, 30, 15])
65

这个输出显然是错误的,因为在阵列中手动添加任何子阵列似乎都不会产生超过 55 的数字。我们需要调试代码。

印刷的力量

为了调试前面的示例,一个简单的、策略性地放置的**“print”**语句就完成了这个任务。让我们打印出内部for循环中的子序列:

该函数修改如下:

#最大子阵列:v1

def max_subarray(sequence):
    """ Find sub-sequence in sequence having maximum sum """

    sums = []
    for i in range(len(sequence)):
        for sub_seq in itertools.combinations(sequence, i):
            sub_seq_sum = sum(sub_seq)
            print(sub_seq,'=>',sub_seq_sum)
            sums.append(sub_seq_sum)

    return max(sums)

现在,代码执行并打印此输出:

>>> max_subarray([-5, 20, -10, 30, 15])
((), '=>', 0)
((-5,), '=>', -5)
((20,), '=>', 20)
((-10,), '=>', -10)
((30,), '=>', 30)
((15,), '=>', 15)
((-5, 20), '=>', 15)
((-5, -10), '=>', -15)
((-5, 30), '=>', 25)
((-5, 15), '=>', 10)
((20, -10), '=>', 10)
((20, 30), '=>', 50)
((20, 15), '=>', 35)
((-10, 30), '=>', 20)
((-10, 15), '=>', 5)
((30, 15), '=>', 45)
((-5, 20, -10), '=>', 5)
((-5, 20, 30), '=>', 45)
((-5, 20, 15), '=>', 30)
((-5, -10, 30), '=>', 15)
((-5, -10, 15), '=>', 0)
((-5, 30, 15), '=>', 40)
((20, -10, 30), '=>', 40)
((20, -10, 15), '=>', 25)
((20, 30, 15), '=>', 65)
((-10, 30, 15), '=>', 35)
((-5, 20, -10, 30), '=>', 35)
((-5, 20, -10, 15), '=>', 20)
((-5, 20, 30, 15), '=>', 60)
((-5, -10, 30, 15), '=>', 30)
((20, -10, 30, 15), '=>', 55)
65

通过查看打印语句的输出,问题现在清楚了。

有一个子数组[20, 30, 15](在前面的输出中以粗体突出显示),它产生65之和。但是,这不是有效的子阵列*,因为元素在原始阵列中不连续。*

显然,该程序是错误的,需要修复。

分析重写

快速分析告诉我们itertools.combinations的使用是罪魁祸首。我们使用它作为从阵列快速生成不同长度的所有子阵列的一种方法,但是使用组合尊重项目的顺序,并且生成所有组合,生成不连续的子阵列。

显然,我们需要重写这个。以下是重写的第一次尝试:

#最大子阵列:v2

def max_subarray(sequence):
    """ Find sub-sequence in sequence having maximum sum """

    sums = []

    for i in range(len(sequence)):
        for j in range(i+1, len(sequence)):
            sub_seq = sequence[i:j]
            sub_seq_sum = sum(sub_seq)
            print(sub_seq,'=>',sub_seq_sum)
            sums.append(sum(sub_seq))

    return max(sums)

现在输出如下:

>>> max_subarray([-5, 20, -10, 30, 15])
([-5], '=>', -5)
([-5, 20], '=>', 15)
([-5, 20, -10], '=>', 5)
([-5, 20, -10, 30], '=>', 35)
([20], '=>', 20)
([20, -10], '=>', 10)
([20, -10, 30], '=>', 40)
([-10], '=>', -10)
([-10, 30], '=>', 20)
([30], '=>', 30)
40

答案再次不正确,因为它给出了次优答案40,而不是正确答案,即55。同样,print 语句起到了解救作用,因为它清楚地告诉我们,主阵列本身并没有被认为是一个由一个错误导致的错误。

当用于迭代序列(数组)的数组索引比正确值少一个或多一个而关闭时,编程中会出现一次关闭或一次关闭错误。这通常出现在序列索引从零开始的语言中,如 C/C++、Java 或 Python。

在这种情况下,被一个错误关闭在这一行:

    "sub_seq = sequence[i:j]"

相反,正确的代码应如下所示:

    "sub_seq = sequence[i:j+1]"

使用此修复程序,我们的代码将按预期生成输出:

#最大子阵列:v2

def max_subarray(sequence):
    """ Find sub-sequence in sequence having maximum sum """

    sums = []

    for i in range(len(sequence)):
        for j in range(i+1, len(sequence)):
            sub_seq = sequence[i:j+1]
            sub_seq_sum = sum(sub_seq)
          print(sub_seq,'=>',sub_seq_sum)
            sums.append(sub_seq_sum)

    return max(sums)

以下是输出:

>>> max_subarray([-5, 20, -10, 30, 15])
([-5, 20], '=>', 15)
([-5, 20, -10], '=>', 5)
([-5, 20, -10, 30], '=>', 35)
([-5, 20, -10, 30, 15], '=>', 50)
([20, -10], '=>', 10)
([20, -10, 30], '=>', 40)
([20, -10, 30, 15], '=>', 55)
([-10, 30], '=>', 20)
([-10, 30, 15], '=>', 35)
([30, 15], '=>', 45)
55

让我们假设在这一点上,您认为代码是完整的。

您将代码传递给审阅者,他们提到您的代码虽然称为max_subarray,但实际上忘记返回子数组本身,而只返回总和。还有一个反馈,您不需要维护一个总和数组。

您将此反馈结合起来,生成代码的 3.0 版本,该版本修复了这两个问题:

#最大子阵列:v3

def max_subarray(sequence):
    """ Find sub-sequence in sequence having maximum sum """

    # Trackers for max sum and max sub-array
    max_sum, max_sub = 0, []

    for i in range(len(sequence)):
        for j in range(i+1, len(sequence)):
            sub_seq = sequence[i:j+1]
            sum_s = sum(sub_seq)
            if sum_s > max_sum:
                # If current sum > max sum so far, replace the values
                max_sum, max_sub = sum_s, sub_seq

    return max_sum, max_sub

>>>  max_subarray([-5, 20, -10, 30, 15])
(55, [20, -10, 30, 15])

请注意,我们在最后一个版本中删除了 print 语句,因为逻辑已经正确,因此不需要调试。

一切都好。

定时和优化代码

如果你稍微分析一下代码,你会发现代码在整个序列中执行了两次传递,一次是外部传递,一次是内部传递。因此,如果序列包含n项,则代码执行nn*传递。

我们从第四章了解到好的表现是值得的!,该代码以*O(n2)*的顺序执行。我们可以使用一个简单的context-managerwith操作符来测量花在代码上的实时时间。

我们的上下文管理器如下所示:

import time
from contextlib import contextmanager

@contextmanager
def timer():
    """ Measure real-time execution of a block of code """

    try:
        start = time.time()
        yield
    finally:
        end = (time.time() - start)*1000
        print 'time taken=> %.2f ms' % end

让我们修改代码,创建一个大小不同的随机数数组,以测量所花费的时间。我们将为此编写一个函数:

import random

def num_array(size):
    """ Return a list of numbers in a fixed random range
    of given size """

    nums = []
    for i in range(size):
        nums.append(random.randrange(-25, 30))
    return nums

让我们为各种大小的阵列计时逻辑,从 100 开始:

>>> with timer():
... max_subarray(num_array(100))
... (121, [7, 10, -17, 3, 21, 26, -2, 5, 14, 2, -19, -18, 23, 12, 8, -12, -23, 28, -16, -19, -3, 14, 16, -25, 26, -16, 4, 12, -23, 26, 22, 12, 23])
time taken=> 16.45 ms

对于 1000 个数组,代码如下所示:

>>> with timer():
... max_subarray(num_array(100))
... (121, [7, 10, -17, 3, 21, 26, -2, 5, 14, 2, -19, -18, 23, 12, 8, -12, -23, 28, -16, -19, -3, 14, 16, -25, 26, -16, 4, 12, -23, 26, 22, 12, 23])
time taken=> 16.45 ms

这大约需要 3.3 秒。

可以看出,输入大小为 10000 时,代码运行大约需要 2 到 3 个小时。

有没有办法优化代码?是的,有一个*O(n)*版本的相同的代码,看起来像这样:

def max_subarray(sequence):
    """ Maximum subarray – optimized version """

    max_ending_here = max_so_far = 0

    for x in sequence:
        max_ending_here = max(0, max_ending_here + x)
        max_so_far = max(max_so_far, max_ending_here)

    return max_so_far

使用此版本,所花费的时间要好得多:

>>> with timer():
... max_subarray(num_array(100))
... 240
time taken=> 0.77 ms

对于 1000 个阵列,所用时间如下:

>>> with timer():
... max_subarray(num_array(1000))
... 2272
time taken=> 6.05 ms

对于 10000 个数组,时间大约为 44 毫秒:

>>> with timer():
... max_subarray(num_array(10000))
... 19362
time taken=> 43.89 ms

简单的调试技巧和技巧

我们在前面的示例中看到了简单的print语句的威力。以类似的方式,可以使用其他简单的技术来调试程序,而无需借助调试器。

调试可以被认为是一个逐步排除的过程,直到程序员找到错误的原因。它主要包括以下步骤:

  • 分析代码并提出一组可能是 bug 来源的假设(原因)。
  • 使用适当的调试技术逐一测试假设。
  • 在测试的每一步中,您要么在测试成功时找到 bug 的来源,告诉您问题出在您测试的具体原因上;或者测试失败,然后继续测试下一个假设。
  • 重复最后一步,直到找到原因或放弃当前一组可能的假设。然后重新开始整个循环,直到(希望)找到原因。

单词搜索程序

在部分中,我们将通过示例逐一介绍一些简单的调试技术。我们将从单词搜索程序的示例开始,该程序在文件列表中查找包含特定单词的行,并在列表中追加和返回这些行。

以下是 word searcher 程序的代码列表:

import os
import glob

def grep_word(word, filenames):
    """ Open the given files and look for a specific word.
    Append lines containing word to a list and
    return it """

    lines, words = [], []

    for filename in filenames:
        print('Processing',filename)
        lines += open(filename).readlines()

    word = word.lower()
    for line in lines:
        if word in line.lower():
            lines.append(line.strip())

    # Now sort the list according to length of lines
    return sorted(words, key=len)

您可能已经注意到前面的代码中有一个微妙的错误,它附加到了错误的列表中。它从列表“行”中读取,并附加到同一个列表中,这将导致列表永远增长;当程序遇到包含给定单词的单行时,它将进入无限循环。

让我们在当前目录下运行程序:

>>> parse_filename('lines', glob.glob('*.py'))
(hangs)

在任何一天,你都可能很容易发现这个 bug。在一个糟糕的日子里,你可能会在这上面停留一段时间,没有注意到正在读取的同一个列表正在被追加。

以下是您可以做的几件事:

  • 由于代码挂起并且有两个循环,请找出导致问题的循环。为此,要么在两个循环之间放置一个 print 语句,要么放置一个sys.exit函数,这将导致解释器在该点退出。
  • 一个 print 语句可能会被开发人员遗漏,特别是当代码有许多其他 print 语句时,但是一个sys.exit当然永远不会被遗漏。

单词搜索程序调试步骤 1

代码重写如下,在两个循环之间插入一个特定的sys.exit(…)调用:

import os
import glob

def grep_word(word, filenames):
    """ Open the given files and look for a specific word.
    Append lines containing word to a list and
    return it """

    lines, words = [], []

    for filename in filenames:
        print('Processing',filename)
        lines += open(filename).readlines()

    sys.exit('Exiting after first loop')

    word = word.lower()
    for line in lines:
        if word in line.lower():
            lines.append(line.strip())

    # Now sort the list according to length of lines
    return sorted(words, key=len)

第二次尝试时,我们得到以下输出:

>>> grep_word('lines', glob.glob('*.py'))
Exiting after first loop

现在很明显,问题不在第一个循环中。您现在可以继续调试第二个循环(我们假设您完全不知道错误的变量用法,因此您通过调试以艰难的方式解决了这个问题)。

单词搜索程序调试步骤 2

无论何时您怀疑循环中的某个代码块导致错误,都有一些技巧可以调试它,并确认您的怀疑。这些措施包括:

  • 在代码块前面放一个战略 continue。如果问题消失,那么您已经确认了问题所在的特定块或任何下一个块。您可以继续向下移动continue语句,直到确定导致问题的特定代码块。
  • 通过在代码块前面加上一个前缀if 0:,使 Python 跳过该代码块。如果该块是一行代码或几行代码,则这更有用。
  • 如果一个循环中有很多代码,并且循环执行了很多次,那么 print 语句可能不会对您有多大帮助,因为会打印大量的数据,并且很难筛选和扫描它并找出问题所在。

在本例中,我们将使用第一个技巧来解决问题。以下是修改后的代码:

def grep_word(word, filenames):
    """ Open the given files and look for a specific word.
    Append lines containing word to a list and
    return it """

    lines, words = [], []

    for filename in filenames:
        print('Processing',filename)
        lines += open(filename).readlines()

    # Debugging steps
    # 1\. sys.exit
    # sys.exit('Exiting after first loop')

    word = word.lower()
    for line in lines:
        if word in line.lower():
            words.append(line.strip())
            continue

    # Now sort the list according to length of lines
    return sorted(words, key=len)

>>> grep_word('lines', glob.glob('*.py'))
[]

现在代码执行了,很明显问题出在处理步骤中。希望从这里开始,这只是找出 bug 的一个步骤,因为程序员最终通过调试过程看到了导致问题的线路。

单词搜索程序最终代码

我们花了一些时间,通过前几节中记录的几个调试步骤来解决程序中的问题。有了这个,我们假设的程序员能够在代码中找到问题并解决它。

以下是修复错误的最终代码:

def grep_word(word, filenames):
    """ Open the given files and look for a specific word.
    Append lines containing word to a list and
    return it """

    lines, words = [], []

    for filename in filenames:
        print('Processing',filename)
        lines += open(filename).readlines()

    word = word.lower()
    for line in lines:
        if word in line.lower():
            words.append(line.strip())

    # Now sort the list according to length of lines
    return sorted(words, key=len)

结果如下:

>>> grep_word('lines', glob.glob('*.py'))
['for line in lines:', 'lines, words = [], []', 
  '#lines.append(line.strip())', 
  'lines += open(filename).readlines()',
  'Append lines containing word to a list and', 
  'and return list of lines containing the word.', 
  '# Now sort the list according to length of lines', 
  "print('Lines => ', grep_word('lines', glob.glob('*.py')))"]

让我们总结一下到目前为止我们在本节中学习的简单调试技巧,并看看一些相关技巧和技术。

跳过代码块

程序员可以跳过他们怀疑在调试过程中导致错误的代码块。如果块在循环中,可以通过使用continue语句跳过执行来完成。我们已经看到了一个这样的例子。

如果该块在循环之外,可以使用if 0并将可疑代码移动到从属块,如下所示:

if 0:# Suspected code block
     perform_suspect_operation1(args1, args2, ...)
     perform_suspect_operation2(…)

如果在此之后 bug 消失了,那么您可以确定问题在于可疑的代码块。

这种技巧有其自身的缺陷,因为它需要将大块代码缩进右侧,一旦调试完成,这些代码块就应该缩进。因此,建议不要使用超过 5-6 行代码的代码。

停止执行

如果您正处于繁忙的编程过程中,并且您正试图找出一个难以捉摸的错误,已经尝试了打印语句、使用调试器和其他方法,一种相当激烈但通常非常有用的方法是使用函数sys.exit在可疑代码路径之前或之前停止执行表示

一个sys.exit(<strategic message>)会使程序停止运行,因此程序员不会错过这个*。在以下情况下,这通常非常有用:*

** 一段复杂的代码有一个难以捉摸的 bug,具体取决于输入的特定值或范围,这会导致捕获并忽略异常,但随后会导致程序出现问题。

  • In this case, checking for the specific value or range, and then exiting the code using the right message in the exception handler via sys.exit will allow you to pinpoint the problem. The programmer can then decide to fix the issue by correcting the input or variable processing code.

    在编写并发程序时,错误使用资源锁定或其他问题会使跟踪死锁、竞争条件等错误变得困难。由于通过调试器调试多线程或多进程程序非常困难,一种简单的方法是在实现正确的异常处理代码后,在可疑函数中添加一个sys.exit

  • 当您的代码有严重的内存泄漏或无限循环时,过一段时间后就很难进行调试,否则就无法确定问题所在。将sys.exit(<message>)行从一行代码移到下一行代码,直到确定问题为止,可以作为最后的手段。

使用包装器的外部依赖项

如果怀疑问题不在函数内部,而是在通过代码调用的函数中,可以使用这种方法。

由于该函数不在您的控制范围内,您可以尝试在具有控制权的模块中用包装函数替换它。

例如,下面是处理串行 JSON 数据的通用代码。让我们假设程序员在处理某些数据时发现了一个 bug(可能有某个键值对),并怀疑外部 API 是 bug 的来源。错误可能是 API 超时、返回损坏的响应,或者在最坏的情况下导致崩溃:

import external_api
def process_data(data):
    """ Process data using external API """

    # Clean up data—local function
    data = clean_up(data)
    # Drop duplicates from data—local function
    data = drop_duplicates(data)

    # Process line by line JSON
    for json_elem in data:
        # Bug ?
        external_api.process(json_elem)

验证这一点的一种方法是模拟伪造API 以获取数据的特定范围或值。在这种情况下,可以通过如下方式创建包装函数来完成:

def process(json_data, skey='suspect_key',svalue='suspect_value'):
    """ Fake the external API except for the suspect key & value """

    # Assume each JSON element maps to a Python dictionary

    for json_elem in json_data:
        skip = False

        for key in json_elem:
            if key == skey:
                if json_elem[key] == svalue:
                    # Suspect key,value combination - dont process
                    # this JSON element
                    skip = True
                    break

        # Pass on to the API
        if not skip:
            external_api.process(json_elem)

def process_data(data):
    """ Process data using external API """

    # Clean up data—local function
    data = clean_up(data)
    # Drop duplicates from data—local function
    data = drop_duplicates(data)

    # Process line by line JSON using local wrapper
    process(data)

如果你的怀疑确实正确,这将导致问题消失。然后,您可以将其用作测试代码,并与外部 API 的利益相关者通信以解决问题,或者编写代码以确保在发送到 API 的数据中跳过问题键值对。

用返回值/数据替换函数(模拟)

在现代 web 应用编程中,您永远不会远离程序中的阻塞 I/O 调用。这可能是一个简单的 URL 请求,一个稍微涉及的外部 API 请求,或者可能是一个代价高昂的数据库查询,这样的调用可能是 bug 的来源。

您可能会发现以下任一情况:

  • 此类调用的返回数据可能是问题的原因
  • 调用本身是问题的原因,例如 I/O 或网络错误、超时或资源争用

当您遇到成本高昂的 I/O 问题时,复制它们通常是一个问题。原因如下:

  • I/O 调用需要时间,因此调试这一过程会浪费大量时间,无法集中精力解决实际问题
  • 由于每次外部请求可能返回稍有不同的数据,因此后续调用可能无法针对该问题重复
  • 如果您使用的是外部付费 API,那么这些调用实际上可能会花费您的钱,因此您无法在调试和测试时耗尽大量此类调用

在这些情况下非常有用的一种常见技术是保存这些 API/函数的返回数据,然后通过使用函数的返回数据替换函数/API 本身来模拟函数。这是一种类似于模拟测试的方法,但在调试环境中使用。

让我们看一个 API 示例,该 API 返回网站上的业务列表,给定一个业务地址,包括其名称、街道地址、城市等详细信息。代码如下所示:

import config

search_api = 'http://api.%(site)s/listings/search'

def get_api_key(site):
    """ Return API key for a site """

    # Assumes the configuration is available via a config module
    return config.get_key(site)

def api_search(address, site='yellowpages.com'):
    """ API to search for a given business address
    on a site and return results """

    req_params = {}
    req_params.update({
        'key': get_api_key(site),
        'term': address['name'],
        'searchloc': '{0}, {1}, {1}'.format(address['street'],
                                            address['city'],
                                            address['state'])})
    return requests.post(search_api % locals(),
                         params=req_params)

def parse_listings(addresses, sites):
    """ Given a list of addresses, fetch their listings
    for a given set of sites, process them """

    for site in sites:
        for address in addresses:
            listing = api_search(address, site)
            # Process the listing
            process_listing(listing, site)

def process_listings(listing, site):
    """ Process a listing and analzye it """

     # Some heavy computational code
     # whose details we are not interested.

代码做了一些假设,其中之一是每个站点都有相同的 API URL 和参数。请注意,这仅用于说明目的。实际上,每个站点都有非常不同的 API 格式,包括它的 URL 和它接受的参数。

注意在最后一段代码中,实际工作是在process_listings函数中完成的,因为示例是说明性的,所以未显示该函数的代码。

假设您正在尝试调试此函数。但是,由于 API 调用中的延迟或错误,您发现在获取清单本身时浪费了大量宝贵的时间。您可以使用哪些技巧来避免这种依赖性?以下是您可以做的几件事:

  • 不要通过 API 获取列表,而是将它们保存到文件、数据库或内存存储中,然后按需加载
  • 通过缓存或记忆模式缓存api_search函数的返回值,以便在第一次调用后进一步调用,从内存返回数据
  • 模拟数据,并返回与原始数据具有相同特征的随机数据

我们将依次研究其中的每一项。

将数据作为缓存保存到文件或从文件加载数据

在这种技术中,您使用来自输入数据的唯一键来构造文件名。如果磁盘上存在匹配文件,则打开该文件并返回数据,否则调用并写入数据。这可以通过使用文件缓存装饰器来实现,如下代码所示:

import hashlib
import json
import os

def unique_key(address, site):
    """ Return a unique key for the given arguments """

    return hashlib.md5(''.join((address['name'],
                               address['street'],
                               address['city'],
                               site)).encode('utf-8')).hexdigest()

def filecache(func):
    """ A file caching decorator """

    def wrapper(*args, **kwargs):
        # Construct a unique cache filename
        filename = unique_key(args[0], args[1]) + '.data'

        if os.path.isfile(filename):
            print('=>from file<=')
            # Return cached data from file
            return json.load(open(filename))

        # Else compute and write into file
        result = func(*args, **kwargs)
        json.dump(result, open(filename,'w'))

        return result

    return wrapper

@filecache
def api_search(address, site='yellowpages.com'):
    """ API to search for a given business address
    on a site and return results """

    req_params = {}
    req_params.update({
        'key': get_api_key(site),
        'term': address['name'],
        'searchloc': '{0}, {1}, {1}'.format(address['street'],
                                            address['city'],
                                            address['state'])})
    return requests.post(search_api % locals(),
                         params=req_params)

以下是上述代码的工作原理:

  1. api_search功能以filecache装饰。
  2. filecache使用unique_key作为函数,计算唯一文件名,用于存储 API 调用的结果。在这种情况下,unique_key函数使用企业名称、街道和城市的组合以及查询的站点的散列来构建唯一值。
  3. 第一次调用函数时,通过 API 获取数据并存储在文件中。在进一步调用期间,数据直接从文件返回。

这在大多数情况下都非常有效。大多数数据只加载一次,并在进一步调用时从文件缓存返回。但是,这会遇到陈旧数据的问题,因为一旦创建了文件,数据总是从文件返回。同时,服务器上的数据可能已更改。

这可以通过使用内存键值存储来解决,并将数据保存在内存中,而不是保存在磁盘上的文件中。为此,可以使用众所周知的键值存储,如MemcachedMongoDBRedis。在下面的示例中,我们将向您展示如何使用 Redis 将filecache修饰符替换为memorycache修饰符。

将数据作为缓存保存到内存或从内存加载数据

在这种技术中,使用输入参数的唯一值构造唯一的内存缓存密钥。如果通过使用键查询在缓存存储上找到缓存,则从存储返回其值;或者调用并写入缓存。为了确保数据不会太陈旧,使用了固定的生存时间TTL)。我们使用 Redis 作为缓存存储引擎:

from redis import StrictRedis

def memoize(func, ttl=86400):
    """ A memory caching decorator """

    # Local redis as in-memory cache
    cache = StrictRedis(host='localhost', port=6379)

    def wrapper(*args, **kwargs):
        # Construct a unique key

        key = unique_key(args[0], args[1])
        # Check if its in redis
        cached_data = cache.get(key)
        if cached_data != None:
             print('=>from cache<=')
             return json.loads(cached_data)
         # Else calculate and store while putting a TTL
         result = func(*args, **kwargs)
         cache.set(key, json.dumps(result), ttl)

         return result

    return wrapper

注意,我们正在重用前面代码示例中的unique_key定义。

代码其余部分唯一的变化是我们将filecache装饰器替换为memoize装饰器:

@memoize    
def api_search(address, site='yellowpages.com'):
    """ API to search for a given business address
    on a site and return results """

    req_params = {}
    req_params.update({
        'key': get_api_key(site),
        'term': address['name'],
        'searchloc': '{0}, {1}, {1}'.format(address['street'],
                                            address['city'],
                                            address['state'])})
    return requests.post(search_api % locals(),
                         params=req_params)

本版本与前一版本相比的优点如下:

  • 缓存存储在内存中。不会创建其他文件。
  • 缓存是使用 TTL 创建的,超过 TTL 将过期。因此,过时数据的问题得以避免。TTL 是可定制的,在本例中默认为一天(86400 秒)。

还有一些其他技术可以模拟外部 API 调用和类似的依赖项。其中一些列示如下:

  • 在 Python 中使用StringIO对象来读/写数据,而不是使用文件。例如,filecachememoize装饰符可以很容易地修改为使用StringIO对象。
  • 使用可变默认参数(如字典或列表)作为缓存,并将结果写入其中。由于 Python 中的可变参数在重复调用后保持其状态,因此它有效地充当内存缓存。
  • 通过编辑系统主机的文件、添加相关主机的条目并将其 IP 设置为127.0.0.1,将外部 API 替换为对本地计算机上服务的替换/伪 API 调用的调用(127.0.0.1IP 地址)。对 localhost 的调用始终可以返回标准(固定)响应。

例如,在 Linux 和其他 POSIX 系统上,您可以在/etc/hosts文件中添加如下行:

# Only for testing—comment out after that!
127.0.0.1 api.website.com

请注意,只要您记住在测试后注释掉这些行,这种技术就是一种非常有用和聪明的方法!

返回随机/模拟数据

另一种对性能测试和调试最有用的技术是向函数提供相似,但与原始数据不同的数据。

例如,我们假设您正在开发一个应用,该应用处理特定保险计划下患者的患者/医生数据(如美国的医疗保险/医疗补助,印度的 ESI),以分析和找出常见疾病、政府支出方面的前十大健康问题等模式。

假设您的应用一次可以从数据库中加载和分析数万行患者数据,在峰值负载下可以扩展到 100-200 万行。您希望调试应用,并找出这种负载下的性能特征,但您没有任何实际数据,因为数据处于收集阶段。

在这种情况下,生成并返回模拟数据的库或函数非常有用。在本节中,我们将使用第三方 Python 库来实现这一点。

生成随机患者数据

让我们假设,对于患者,我们需要以下基本字段:

  • 名称
  • 年龄
  • 性别
  • 健康问题
  • 医生姓名
  • 血型
  • 投保与否
  • 上次就诊日期

Python 中的schematics库提供了一种使用简单类型生成此类数据结构的方法,然后可以对这些数据结构进行验证、转换和模拟。

schematics是可通过pip使用以下命令安装的库:

$ pip install schematics

要生成一个只有姓名和年龄的人的模型,只需在schematics中编写一个类即可:

from schematics import Model
from schematics.types import StringType, DecimalType

class Person(Model):
    name = StringType()
    age = DecimalType()

为了生成模拟数据,将返回一个模拟对象,并使用以下方法创建一个原语

>>> Person.get_mock_object().to_primitive()
{'age': u'12', 'name': u'Y7bnqRt'}
>>> Person.get_mock_object().to_primitive()
{'age': u'1', 'name': u'xyrh40EO3'}

可以使用原理图创建自定义类型。例如,对于患者模型,假设我们只对 18-80 岁年龄组感兴趣,因此我们需要返回该范围内的年龄数据。

以下自定义类型为我们提供了此功能:

from schematics.types import IntType

class AgeType(IntType):
    """ An age type for schematics """

    def __init__(self, **kwargs):
        kwargs['default'] = 18
        IntType.__init__(self, **kwargs)

    def to_primitive(self, value, context=None):
        return random.randrange(18, 80)

此外,由于 schematics 库返回的名称只是随机字符串,因此它们还有一些改进空间。下面的NameType类通过返回包含元音和辅音巧妙组合的名称对其进行了改进:

import string
import random

class NameType(StringType):
    """ A schematics custom name type """

    vowels='aeiou'
    consonants = ''.join(set(string.ascii_lowercase) - set(vowels))

    def __init__(self, **kwargs):
        kwargs['default'] = ''
        StringType.__init__(self, **kwargs)

   def get_name(self):
        """ A random name generator which generates
        names by clever placing of vowels and consontants """

        items = ['']*4

        items[0] = random.choice(self.consonants)
        items[2] = random.choice(self.consonants)

        for i in (1, 3):
            items[i] = random.choice(self.vowels)            

        return ''.join(items).capitalize()

    def to_primitive(self, value, context=None):
        return self.get_name()

当结合这两种新类型时,我们的Person类在返回模拟数据时看起来更好:

class Person(Model):
    name = NameType()
    age = AgeType()
>>> Person.get_mock_object().to_primitive()
{'age': 36, 'name': 'Qixi'}
>>> Person.get_mock_object().to_primitive()
{'age': 58, 'name': 'Ziru'}
>>> Person.get_mock_object().to_primitive()
{'age': 32, 'name': 'Zanu'}

以类似的方式,很容易找到一组自定义类型和标准类型,以满足患者模型所需的所有字段:

class GenderType(BaseType):
    """A gender type for schematics """

    def __init__(self, **kwargs):
        kwargs['choices'] = ['male','female']
        kwargs['default'] = 'male'
        BaseType.__init__(self, **kwargs)

class ConditionType(StringType):
    """ A gender type for a health condition """

    def __init__(self, **kwargs):
        kwargs['default'] = 'cardiac'
        StringType.__init__(self, **kwargs)     

    def to_primitive(self, value, context=None):
        return random.choice(('cardiac',
                              'respiratory',
                              'nasal',
                              'gynec',
                              'urinal',
                              'lungs',
                              'thyroid',
                              'tumour'))

import itertools

class BloodGroupType(StringType):
    """ A blood group type for schematics  """

    def __init__(self, **kwargs):
        kwargs['default'] = 'AB+'
        StringType.__init__(self, **kwargs)

    def to_primitive(self, value, context=None):
        return ''.join(random.choice(list(itertools.product(['AB','A','O','B'],['+','-']))))    

现在,将所有这些与一些标准类型和默认值结合到一个患者模型中,我们得到以下代码:

class Patient(Model):
    """ A model class for patients """

    name = NameType()
    age = AgeType()
    gender = GenderType()
    condition = ConditionType()
    doctor = NameType()
    blood_group = BloodGroupType()
    insured = BooleanType(default=True)
    last_visit = DateTimeType(default='2000-01-01T13:30:30')

现在,创建任意大小的随机数据就像调用 Patient 类上的get_mock_object方法来处理任意数量的n一样简单:

patients = map(lambda x: Patient.get_mock_object().to_primitive(), range(n))

对于示例,要创建 10000 个随机患者数据,我们使用以下方法:

>>> patients = map(lambda x: Patient.get_mock_object().to_primitive(), range(1000))

这些数据可以作为模拟数据输入到处理函数,直到实际数据可用为止。

注意:Python 中的 Faker 库对于生成各种各样的伪数据(例如名称、地址、URI、随机文本等)也很有用。

现在让我们从这些简单的技巧和技术转移到更复杂的内容,主要是在应用中配置日志记录。

作为调试技术的日志记录

Python 附带了标准库支持,通过恰当命名的logging模块进行日志记录。尽管 print 语句可以用作快速和基本的调试工具,但实际调试主要需要系统或应用生成一些日志。由于以下原因,日志记录非常有用:

  • 日志通常保存到特定的日志文件中,通常带有时间戳,并在服务器上保留一段时间,直到它们被转出。这使得调试变得容易,即使程序员在问题发生一段时间后正在调试它。
  • 日志记录可以在不同的级别上完成,从基本信息到详细调试级别,都可以更改应用输出的信息量。这允许程序员在不同的日志级别进行调试,以提取他们想要的信息,并找出问题所在。
  • 可以编写自定义记录器,它可以对各种输出执行日志记录。最基本的情况是,日志记录是对文件进行记录的,但也可以编写日志记录程序,对套接字、HTTP 流、数据库等进行写入。

简单应用日志

在 Python 中配置简单日志比较容易,如下图:

>>> import logging
>>> logging.warning('I will be back!')
WARNING:root:I will be back!

>>> logging.info('Hello World')
>>>

执行上述代码时不会发生任何事情,因为默认情况下,logging配置为警告级别。但是,配置日志记录以更改其级别非常容易。

以下代码将日志更改为info级别的日志,并添加目标文件以保存日志:

>>> logging.basicConfig(filename='application.log', level=logging.DEBUG)
>>> logging.info('Hello World')

如果我们检查application.log文件,会发现它包含以下行:

INFO:root:Hello World

为了向日志行添加时间戳,我们需要配置日志记录格式。这可以通过以下方式完成:

>>> logging.basicConfig(format='%(asctime)s %(message)s')

结合这一点,我们得到最终的日志配置,如下所示:

>>> logging.basicConfig(format='%(asctime)s %(message)s', filename='application.log', level=logging.DEBUG)
>>> logging.info('Hello World!')

现在,application.log的内容如下:

INFO:root:Hello World
2016-12-26 19:10:37,236 Hello World!

日志记录支持变量参数,用于向作为第一个参数提供的模板字符串提供参数。

直接记录用逗号分隔的参数不起作用。例如:

>>> import logging
>>> logging.basicConfig(level=logging.DEBUG)
>>> x,y=10,20
>>> logging.info('Addition of',x,'and',y,'produces',x+y)
--- Logging error ---
Traceback (most recent call last):
 File "/usr/lib/python3.5/logging/__init__.py", line 980, in emit
 msg = self.format(record)
 File "/usr/lib/python3.5/logging/__init__.py", line 830, in format
 return fmt.format(record)
 File "/usr/lib/python3.5/logging/__init__.py", line 567, in format
 record.message = record.getMessage()
 File "/usr/lib/python3.5/logging/__init__.py", line 330, in getMessage
 msg = msg % self.args
TypeError: not all arguments converted during string formatting
Call stack:
 File "<stdin>", line 1, in <module>
Message: 'Addition of'
Arguments: (10, 'and', 20, 'produces', 30)

但是,我们可以使用以下方法:

>>> logging.info('Addition of %s and %s produces %s',x,y,x+y)
INFO:root:Addition of 10 and 20 produces 30

前面的例子效果很好。

高级日志记录器对象

使用logging模块直接记录在最简单的情况下工作。但是,为了从logging模块中提取最大值,我们应该使用 logger 对象。它还允许我们执行许多自定义设置,例如自定义格式化程序、自定义处理程序等等。

让我们编写一个函数,返回这样一个自定义记录器。它接受应用名称、日志记录级别和另外两个选项—日志文件名,以及是否打开控制台日志记录:

import logging
def create_logger(app_name, logfilename=None, 
                             level=logging.INFO, console=False):

    """ Build and return a custom logger. Accepts the application name,
    log filename, loglevel and console logging toggle """

    log=logging.getLogger(app_name)
    log.setLevel(logging.DEBUG)
    # Add file handler
    if logfilename != None:
        log.addHandler(logging.FileHandler(logfilename))

    if console:
        log.addHandler(logging.StreamHandler())

    # Add formatter
    for handle in log.handlers:
        formatter = logging.Formatter('%(asctime)s : %(levelname)-8s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

        handle.setFormatter(formatter)

    return log

让我们检查一下函数:

  1. 它使用logging.getLogger工厂函数创建logger对象,而不是直接使用logging
  2. 默认情况下,logger对象是无用的,因为它没有配置任何处理程序。处理程序是流包装器,负责记录到特定流,例如控制台、文件、套接字等。
  3. 配置是在这个 logger 对象上完成的,例如设置级别(通过setLevel方法),并添加处理程序,例如FileHandler用于记录到文件,以及StreamHandler用于记录到控制台。
  4. 日志消息的格式化在处理程序上完成,而不是在记录器对象本身上完成。我们使用标准格式<timestamp>: <level>—<message>,时间戳YY-mm-dd HH:MM:SS使用日期格式。

让我们看看这一点:

>>> log=create_logger('myapp',logfilename='app.log', console=True)
>>> log
<logging.Logger object at 0x7fc09afa55c0>
>>> log.info('Started application')
2016-12-26 19:38:12 : INFO     - Started application
>>> log.info('Initializing objects...')
2016-12-26 19:38:25 : INFO     - Initializing objects

检查同一目录中的 app.log 文件会显示以下内容:

2016-12-26 19:38:12 : INFOStarted application
2016-12-26 19:38:25 : INFOInitializing objects

高级日志记录自定义格式和记录器

我们研究了如何根据我们的需求创建和配置记录器对象。有时,需要反复检查并在日志行中打印额外的数据,这有助于调试。

调试应用(尤其是那些性能关键的应用)时出现的一个常见问题是找出每个函数或方法所需的时间。现在,虽然可以通过使用探查器分析应用和使用前面讨论的一些技术(如计时器上下文管理器)等方法来发现这一点,但通常可以编写自定义记录器来实现这一点。

让我们假设您的应用是一个业务列表 API 服务器,它响应前面一节中讨论的列表 API 请求。当它启动时,它需要初始化许多对象并从数据库加载一些数据。

假设作为性能优化的一部分,您已经调优了这些例程,并希望记录这些例程所花费的时间。我们将看看是否可以编写一个自定义记录器来为我们执行此操作:

import logging
import time
from functools import partial

class LoggerWrapper(object):
    """ A wrapper class for logger objects with
    calculation of time spent in each step """

    def __init__(self, app_name, filename=None, 
                       level=logging.INFO, console=False):
        self.log = logging.getLogger(app_name)
        self.log.setLevel(level)

        # Add handlers
        if console:
            self.log.addHandler(logging.StreamHandler())

        if filename != None:
            self.log.addHandler(logging.FileHandler(filename))

        # Set formatting
        for handle in self.log.handlers:

          formatter = logging.Formatter('%(asctime)s [%(timespent)s]: %(levelname)-8s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')                                   
             handle.setFormatter(formatter)

        for name in ('debug','info','warning','error','critical'):
            # Creating convenient wrappers by using functools
            func = partial(self._dolog, name)
            # Set on this class as methods
            setattr(self, name, func)

        # Mark timestamp
        self._markt = time.time()

    def _calc_time(self):
        """ Calculate time spent so far """

        tnow = time.time()
        tdiff = int(round(tnow - self._markt))

        hr, rem = divmod(tdiff, 3600)
        mins, sec = divmod(rem, 60)
        # Reset mark
        self._markt = tnow
        return '%.2d:%.2d:%.2d' % (hr, mins, sec)

    def _dolog(self, levelname, msg, *args, **kwargs):
        """ Generic method for logging at different levels """

        logfunc = getattr(self.log, levelname)
        return logfunc(msg, *args, extra={'timespent': self._calc_time()})         

我们已经构建了一个名为LoggerWrapper的定制类。让我们分析一下代码,看看它能做什么:

  1. 此类的__init__方法与我们之前编写的create_logger函数非常相似。它采用相同的参数,构造处理程序对象,并配置logger。但是,这次,logger对象是外部LoggerWrapper实例的一部分。
  2. 格式化程序采用一个名为timespent的附加变量模板。
  3. 似乎没有定义直接日志记录方法。但是,使用部分函数技术,我们将_dolog方法包装在不同的日志级别上,并使用setattr动态地将它们设置在类上为logging方法。
  4. _dolog方法通过使用第一次初始化的标记时间戳计算每个例程花费的时间,然后在每次调用中重置。使用名为 extra 的 dictionary 参数将花费的时间发送到日志记录方法。

让我们看看应用如何使用这个记录器包装器来测量在关键例程中花费的时间。下面是一个假设 Flask web 应用的示例:

    # Application code
    log=LoggerWrapper('myapp', filename='myapp.log',console=True)

    app = Flask(__name__)
    log.info("Starting application...")
    log.info("Initializing objects.")
    init()
    log.info("Initialization complete.")
    log.info("Loading configuration and data …")
    load_objects()
    log.info('Loading complete. Listening for connections …')
    mainloop()

请注意,所花费的时间记录在时间戳之后的方括号内。

假设最后一段代码生成如下输出:

2016-12-26 20:08:28 [00:00:00]: INFOStarting application...
2016-12-26 20:08:28 [00:00:00]: INFO     - Initializing objects.
2016-12-26 20:08:42 [00:00:14]: INFO     - Initialization complete.
2016-12-26 20:08:42 [00:00:00]: INFO     - Loading configuration and data ...
2016-12-26 20:10:37 [00:01:55]: INFO     - Loading complete. Listening for connections

从日志行可以明显看出,初始化耗时 14 秒,而配置和数据的加载耗时 1 分 55 秒。

通过添加类似的日志行,您可以快速而合理地准确地估计花在应用关键部分上的时间。保存在日志文件中,另一个额外的优点是不需要专门计算并将其保存到其他任何地方。

使用此自定义记录器时,请注意,显示为给定日志行所用时间的时间是前一行例程中所用的时间。

高级日志写入系统日志

POSIX 系统,如 Linux 和 Mac OS X,有一个系统日志文件,应用可以写入该文件。通常,此文件显示为/var/log/syslog。让我们看看如何将 Python 日志配置为写入系统日志文件。

您需要做的主要更改是向 logger 对象添加系统日志处理程序,如下所示:

log.addHandler(logging.handlers.SysLogHandler(address='/dev/log'))

让我们修改create_logger函数,使其能够写入syslog,并查看完整的代码:

import logging
import logging.handlers

def create_logger(app_name, logfilename=None, level=logging.INFO, 
                             console=False, syslog=False):
    """ Build and return a custom logger. Accepts the application name,
    log filename, loglevel and console logging toggle and syslog toggle """

    log=logging.getLogger(app_name)
    log.setLevel(logging.DEBUG)
    # Add file handler
    if logfilename != None:
        log.addHandler(logging.FileHandler(logfilename))

    if syslog:
        log.addHandler(logging.handlers.SysLogHandler(address='/dev/log'))

    if console:
        log.addHandler(logging.StreamHandler())

    # Add formatter
    for handle in log.handlers:
        formatter = logging.Formatter('%(asctime)s : %(levelname)-8s - %(message)s',  datefmt='%Y-%m-%d %H:%M:%S')
        handle.setFormatter(formatter)                             

    return log

现在,让我们在登录到syslog时尝试创建一个记录器:

>>> create_logger('myapp',console=True, syslog=True)
>>> log.info('Myapp - starting up…')

让我们检查一下 syslog,看看它是否真的被记录了:

$ tail -3 /var/log/syslog
Dec 26 20:39:54 ubuntu-pro-book kernel: [36696.308437] psmouse serio1: TouchPad at isa0060/serio1/input0 - driver resynced.
Dec 26 20:44:39 ubuntu-pro-book 2016-12-26 20:44:39 : INFO     - Myapp - starting up...
Dec 26 20:45:01 ubuntu-pro-book CRON[11522]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1)

输出表明它确实如此。

使用调试器的调试工具

大多数程序员倾向于认为调试是他们应该使用调试器完成的事情。在本章中,到目前为止,我们已经看到调试不仅仅是一门精确的科学,它还是一门艺术,可以使用许多技巧和技巧来完成,而不是直接跳转到调试器。然而,迟早,我们会在本章中遇到调试器,我们就在这里!

Python 调试器,即众所周知的 pdb,是 Python 运行时的一部分。

从一开始运行脚本时,可以调用 Pdb,如下所示:

$ python3 -m pdb script.py

但是,程序员调用 pdb 的最常见方式是在代码中要进入调试器的位置插入以下行:

import pdb; pdb.set_trace()

让我们使用它,尝试并调试本章第一个示例的一个实例,即 max 子数组的总和。我们将调试代码的O(n)版本作为示例:

def max_subarray(sequence):
    """ Maximum subarray - optimized version """

    max_ending_here = max_so_far = 0
    for x in sequence:
        # Enter the debugger
        import pdb; pdb.set_trace()
        max_ending_here = max(0, max_ending_here + x)
        max_so_far = max(max_so_far, max_ending_here)

    return max_so_far

与 pdb 的调试会话

程序运行后,调试器立即进入第一个循环:

>>> max_subarray([20, -5, -10, 30, 10])
> /home/user/programs/maxsubarray.py(8)max_subarray()
-> max_ending_here = max(0, max_ending_here + x)
-> for x in sequence:
(Pdb) max_so_far
20

您可以使用(s停止执行。Pdb 将执行当前行,并停止:

> /home/user/programs/maxsubarray.py(7)max_subarray()
-> max_ending_here = max(0, max_ending_here + x)

您只需键入变量并按[Enter键即可检查变量:

(Pdb) max_so_far
20

当前堆栈轨迹可以使用(w或何处打印。箭(→) 指示当前堆栈帧:

(Pdb) w

<stdin>(1)<module>()
> /home/user/programs/maxsubarray.py(7)max_subarray()
-> max_ending_here = max(0, max_ending_here + x)

可以使用(c继续执行到下一个断点,也可以继续执行:

> /home/user/programs/maxsubarray.py(6)max_subarray()
-> for x in sequence:
(Pdb) max_so_far
20
(Pdb) c
> /home/user/programs/maxsubarray.py(6)max_subarray()
-> for x in sequence:
(Pdb) max_so_far
20
(Pdb) c
> /home/user/programs/maxsubarray.py(6)max_subarray()
-> for x in sequence:
(Pdb) max_so_far
35
(Pdb) max_ending_here
35

在前面的代码中,我们继续三次迭代for循环,直到最大值从 20 变为 35。让我们检查一下我们在序列中的位置:

(Pdb) x
30

名单上还有一项,即最后一项。让我们使用(llist命令在点检查源代码:

(Pdb) l
  1     
  2     def max_subarray(sequence):
  3         """ Maximum subarray - optimized version """
  4     
  5         max_ending_here = max_so_far = 0
  6  ->     for x in sequence:
  7             max_ending_here = max(0, max_ending_here + x)
  8             max_so_far = max(max_so_far, max_ending_here)
  9             import pdb; pdb.set_trace()
 10     
 11         return max_so_far

可以分别使用(uup和(ddown命令上下遍历堆栈帧:

(Pdb) up
> <stdin>(1)<module>()
(Pdb) up
*** Oldest frame
(Pdb) list
[EOF]
(Pdb) d
> /home/user/programs/maxsubarray.py(6)max_subarray()
-> for x in sequence:

现在让我们从函数返回:

(Pdb) r
> /home/user/programs/maxsubarray.py(6)max_subarray()
-> for x in sequence:
(Pdb) r
--Return--
> /home/user/programs/maxsubarray.py(11)max_subarray()->45
-> return max_so_far

函数的返回值为45

Pdb 有许多其他命令,而不是我们在这里介绍的。但是,我们不打算让本课程成为一个成熟的 pdb 教程。感兴趣的程序员可以参考 Web 上的文档了解更多信息。

Pdb 类似工具

Python 社区已经在 pdb 之上构建了许多有用的工具,但添加了更多有用的功能、开发人员的易用性,或者两者兼而有之。

iPdb

iPdb 已启用 iPythonpdb。它导出函数以访问 iPython 调试器。它还具有制表符补全、语法突出显示、更好的回溯和内省方法。

iPdb 可以与 pip 一起安装。

下面的屏幕截图显示了使用 iPdb 进行调试的会话,与我们之前使用 pdb 时使用的功能相同。注意iPdb提供的语法突出显示:

iPdb

iPdb 正在运行,显示语法高亮显示

还要注意的是,iPdb 提供了比 pdb 更完整的堆栈跟踪:

iPdb

iPdb 正在运行,显示出比 pdb 更完整的堆栈跟踪

请注意,iPdb 使用 iPython 作为默认运行时,而不是 Python。

Pdb++

Pdb++是 Pdb 的替代品,其功能类似于 iPdb,但它在默认 Python 运行时工作,而不需要 iPython。Pdb++也可以通过 pip 安装。

一旦安装了 pdb++,它将接管所有导入 pdb 的地方,因此根本不需要更改代码。

Pdb++执行智能命令解析。例如,如果变量名与标准 Pdb 命令冲突,Pdb 将优先显示该命令而不是显示变量内容。Pdb++聪明地解决了这个问题。

以下是显示 Pdb++运行的屏幕截图,包括语法突出显示、选项卡完成和智能命令解析:

Pdb++

Pdb++在运行中注意 smart 命令解析,其中变量 c 被正确解释

高级调试跟踪

从一开始就跟踪程序通常可以用作高级调试技术。跟踪允许开发人员跟踪程序执行,查找调用方/被调用方关系,并计算出程序运行期间执行的所有函数。

跟踪模块

Python 附带了一个默认trace模块作为其标准库的一部分。

跟踪模块采用–trace--count–listfuncs选项中的一个|第一个选项在执行时跟踪并打印所有源代码行。第二个选项生成带注释的文件列表,其中显示语句执行的次数。后者只显示通过运行程序执行的所有函数。

以下是由trace模块的–trace选项调用的子阵列问题的屏幕截图:

The trace module

通过使用–trace 选项,使用跟踪模块跟踪程序执行。

如您所见,跟踪模块跟踪整个程序执行,一行一行地打印代码。由于这段代码的大部分是一个for循环,您实际上可以看到循环中的代码行被打印出来,即循环执行的次数(五次)。

–trackcalls选项跟踪并打印调用者和被调用者函数之间的关系。

跟踪模块还有许多其他选项,如跟踪调用、生成带注释的文件列表、报告等。我们不会对这些内容进行详尽的讨论,因为读者可以在 Web 上参考本模块的文档以了解更多信息。

lptrace 程序

当调试服务器并试图找出生产环境中的性能或其他问题时,程序员需要的通常不是跟踪模块给出的 Python 系统或堆栈跟踪,而是实时附加到进程并查看正在执行的功能。

可以使用 pip 安装 lptrace。请注意,它不适用于Python3

lptrace包允许您这样做。它不提供要运行的脚本,而是通过进程 ID(如运行服务器、应用等)连接到运行 Python 程序的现有进程。

在下面的屏幕截图中,您可以看到lptrace正在调试我们在第 8 章中开发的 Twisted 聊天服务器,架构模式 Pythonic 方法live。会话显示客户端已连接时的活动:

The lptrace program

lptrace 命令调试 Twisted 中的聊天服务器

有很多日志行,但是您可以观察到当客户端连接时,Twisted 协议的一些著名方法是如何被记录的,例如ConnectionMode。诸如接受之类的套接字调用也可以被视为接受来自客户端的连接的一部分。

使用 strace 进行系统调用跟踪

Strace是一个 Linux 命令,允许用户跟踪运行程序调用的系统调用和信号。它不是 Python 独有的,但可以用来调试任何程序。Strace 可与 lptrace 结合使用,对程序的系统调用进行故障排除。

Stracelptrace类似,可以将其附加到正在运行的进程。也可以从命令行调用它来运行进程,但当运行附加到进程(如服务器)时,它更有用。

例如,此屏幕截图显示连接到聊天服务器运行时的 strace 输出:

System call tracing using strace

连接到 Twisted 聊天服务器的 strace 命令

strace命令证实了服务器等待epoll句柄进行传入连接的lptrace命令的结论。

这是客户端连接时发生的情况:

System call tracing using strace

显示客户端连接到 Twisted 聊天服务器的系统调用的 strace 命令

Strace 是一个非常强大的工具,它可以与特定于运行时的工具(如 Python 的 lptrace)相结合,以便在生产环境中进行高级调试。

总结

在本章中,我们学习了 Python 的不同调试技术。我们从简单的print语句开始,然后用简单的技巧调试 Python 程序,比如在循环中使用continue语句,在代码块之间策略性地放置sys.exit调用,等等。

然后我们详细研究了调试技术,特别是模拟和随机化数据。举例讨论了文件缓存和 Redis 等内存数据库缓存等技术。

一个使用 Python schematics 库的示例显示了为医疗领域中的一个假设应用生成随机数据。

下一节是关于日志记录和使用它作为调试技术。我们讨论了使用日志记录模块的简单日志记录,使用logger对象的高级日志记录,并通过创建一个日志记录器包装器来结束讨论,该包装器具有函数内记录时间的自定义格式。我们还研究了一个写入 syslog 的示例。

本章的最后专门讨论了调试工具。您学习了 Python 调试器 pdb 的基本命令,并快速查看了提供更好体验的类似工具,即 iPdb 和 pdb++。我们在本章的结尾简要讨论了跟踪工具,如 lptrace 和 Linux 上无处不在的strace程序。

这使我们得出本章和本书的结论。*