刘泉皓

没有最强的法术,只有最强的法师。

调查解决speedtest上传不准问题

01 Dec 2018 » memory

speedtest-cli是一个测速软件,但近日有网友反映在Ubuntu中上传只有4MbpsCentos中可以满速上传,本文详细说明原因以及给出解决方法。

一、发现问题

昨天有网友在V2ex发帖反映speedtestUbuntu中上传只有4Mbps,经过我的测试,发现以下情况。

本地是100Mbps带宽,实际上传有10Mbps,在Ubuntu 18.04中使用apt安装的speedtest-cli,调用speedtest命令测速上传不到4Mbps,这里指定server,为了避免测到不同的服务器。

liuxu:~$ speedtest --simple --server 5145
Ping: 5.97 ms
Download: 93.02 Mbit/s
Upload: 3.68 Mbit/s

直接执行apt安装的speedtest-cli.py满速。

liuxu:~$ dpkg -L speedtest-cli
...
/usr/bin/speedtest
/usr/bin/speedtest-cli
/usr/lib/python3/dist-packages/speedtest.py
....
liuxu:~$ python /usr/lib/python3/dist-packages/speedtest.py --simple --server 5145
Ping: 6.348 ms
Download: 93.25 Mbit/s
Upload: 12.62 Mbit/s

直接从github下载最新代码https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest.py,然后直接执行./speedtest-cli满速。

liuxu:Downloads$ ls -l speedtest-cli 
-rwxr-xr-x 1 liuxu liuxu 61623 12月  1 18:16 speedtest-cli
liuxu:Downloads$ ./speedtest-cli --simple --server 5145
Ping: 6.35 ms
Download: 93.15 Mbit/s
Upload: 12.90 Mbit/s

看看当前python版本:

liuxu:Downloads$ python --version
Python 2.7.15rc1

再看看apt安装的speedtest是什么:

liuxu:Downloads$ cat /usr/bin/speedtest
#!/usr/bin/python3
# EASY-INSTALL-ENTRY-SCRIPT: 'speedtest-cli==2.0.0','console_scripts','speedtest'
__requires__ = 'speedtest-cli==2.0.0'
import re
import sys
from pkg_resources import load_entry_point

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
    sys.exit(
        load_entry_point('speedtest-cli==2.0.0', 'console_scripts', 'speedtest')()
    )

一开始我以为是User-agent问题,因为请求时python2python3User-agent是不同的,但我直接用不同python版本执行github下载的文件,才发现是python版本问题。

liuxu:Downloads$ python2 ./speedtest-cli --simple --server 5145
Ping: 5.889 ms
Download: 93.19 Mbit/s
Upload: 11.80 Mbit/s
liuxu:Downloads$ python3 ./speedtest-cli --simple --server 5145
Ping: 5.974 ms
Download: 93.04 Mbit/s
Upload: 3.93 Mbit/s

二、调查错误原因

虽然知道是python版本的问题了,但为了弄清具体是什么问题导致,于是有了以下分析。

首先看看speedtest关键代码:

首先是main,很简单,直接调用shell()

def main():
    try:
        shell()
    except KeyboardInterrupt:
        printer('\nCancelling...', error=True)
    except (SpeedtestException, SystemExit):
        e = get_exception()
        # Ignore a successful exit, or argparse exit
        if getattr(e, 'code', 1) not in (0, 2):
            raise SystemExit('ERROR: %s' % e)

shell()中,它会获取参数,然后if判断不同参数执行不同模块。下面是本文上传相关的代码:

...
try:
    speedtest = Speedtest(
        source_address=args.source,
        timeout=args.timeout,
        secure=args.secure
    )
except (ConfigRetrievalError,) + HTTP_ERRORS:
    printer('Cannot retrieve speedtest configuration', error=True)
    raise SpeedtestCLIError(get_exception())
...
if args.upload:
    printer('Testing upload speed', quiet,
            end=('', '\n')[bool(debug)])
    speedtest.upload(callback=callback, pre_allocate=args.pre_allocate)
    printer('Upload: %0.2f M%s/s' %
            ((results.upload / 1000.0 / 1000.0) / args.units[1],
                args.units[0]),
            quiet)
else:
    printer('Skipping upload test', quiet)
printer('Results:\n%r' % results.dict(), debug=True)

很简单,只是调用了Speedtest类的upload()方法,Speedtest类便是整个程序的主类。再看看upload()方法:

def upload(self, callback=do_nothing, pre_allocate=True):
    """Test upload speed against speedtest.net"""

    sizes = []

    for size in self.config['sizes']['upload']:
        for _ in range(0, self.config['counts']['upload']):
            sizes.append(size)
    # request_count = len(sizes)
    request_count = self.config['upload_max']

    requests = []
    for i, size in enumerate(sizes):
        # We set ``0`` for ``start`` and handle setting the actual
        # ``start`` in ``HTTPUploader`` to get better measurements
        data = HTTPUploaderData(
            size,
            0,
            self.config['length']['upload'],
            shutdown_event=self._shutdown_event
        )
        if pre_allocate:
            data.pre_allocate()

        requests.append(
            (
                build_request(self.best['url'], data, secure=self._secure),
                size
            )
        )
    def producer(q, requests, request_count):
        for i, request in enumerate(requests[:request_count]):
            thread = HTTPUploader(
                i,
                request[0],
                start,
                request[1],
                self.config['length']['upload'],
                opener=self._opener,
                shutdown_event=self._shutdown_event
            )
            thread.start()
            q.put(thread, True)
            callback(i, request_count, start=True)

    finished = []

    def consumer(q, request_count):
        while len(finished) < request_count:
            thread = q.get(True)
            while thread.isAlive():
                thread.join(timeout=0.1)
            finished.append(thread.result)
            callback(thread.i, request_count, end=True)

    q = Queue(self.config['threads']['upload'])
    prod_thread = threading.Thread(target=producer,
                                    args=(q, requests, request_count))
    cons_thread = threading.Thread(target=consumer,
                                    args=(q, request_count))
    start = timeit.default_timer()
    prod_thread.start()
    cons_thread.start()
    while prod_thread.isAlive():
        prod_thread.join(timeout=0.1)
    while cons_thread.isAlive():
        cons_thread.join(timeout=0.1)

    stop = timeit.default_timer()
    self.results.bytes_sent = sum(finished)
    self.results.upload = (
        (self.results.bytes_sent / (stop - start)) * 8.0
    )
    return self.results.upload

代码很简单,首先获取一些参数,然后配置HTTPUploaderData,里面就是要上传数据。然后定义了一个producer生产者,它用来循环上传数据。又定义了一个consumer消费者,用来统计上传量,测速原理就是用上传量除以上传时间。问题就出在这些代码中,所以我首先对producer()生产者上传做了调试。

def producer(q, requests, request_count):
    for i, request in enumerate(requests[:request_count]):
        printer("%f: %f" % (timeit.default_timer(), i))
        thread = HTTPUploader(
            i,
            request[0],
            start,
            request[1],
            self.config['length']['upload'],
            opener=self._opener,
            shutdown_event=self._shutdown_event
        )
        thread.start()
        q.put(thread, True)
        callback(i, request_count, start=True)

finished = []

只添加了一个显示当前时间和当前id的代码,然后执行看看:

liuxu:Downloads$ python3 ./speedtest-cli --no-download --simple --server 5145
98073.510638: 0.000000
98073.511213: 1.000000
98073.511581: 2.000000
98073.511784: 3.000000
98083.676585: 4.000000
98083.677201: 5.000000
98083.677600: 6.000000
...
98083.690243: 48.000000
98083.690406: 49.000000
98083.690577: 50.000000
Ping: 6.298 ms
Download: 0.00 Mbit/s
Upload: 1.65 Mbit/s

从输出可以看见,开始4个时间为98073,后面时间都是98083,间隔了10秒。这是个很奇怪的事情,因为代码在for循环里,如果代码没毛病,时间应该是慢慢增加才对。这里大致可以判断,可能是消费者没有取走数据,导致q.put(thread, True)队列阻塞,所以会出现这个样子。所以我注释掉这里的打印,然后去consumer()消费者打印看看:

def consumer(q, request_count):
    while len(finished) < request_count:
        thread = q.get(True)
        printer("get thread")
        while thread.isAlive():
            printer("%f" % timeit.default_timer())
            thread.join(timeout=0.1)
        finished.append(thread.result)
        printer("append finished")
        callback(thread.i, request_count, end=True)

然后执行一下:

liuxu:Downloads$ python3 ./speedtest-cli --no-download --simple --server 5145
get thread
98664.630203
98664.730297
98664.830474
98664.930651
98665.030855
98665.131053
...
98673.950274
98674.050491
98674.150703
98674.551635
98674.651857
98674.752034
append finished
get thread
append finished
get thread
append finished
get thread
...
append finished
get thread
append finished
Ping: 5.526 ms
Download: 0.00 Mbit/s
Upload: 1.65 Mbit/s

这个输出就很不对了,标准的应该是get thread后有几个等待时间输出,此时生产者把数据上传成功后thread会结束,然后消费者的thread.isAlive()判断失效,接着会输出append finished。一直这么循环下去,直到所有东西上传完。为了给出证明,这里看看python2的执行输出:

liuxu:Downloads$ python2 ./speedtest-cli --no-download --simple --server 5145
get thread
1543663678.332612
1543663678.432770
append finished
get thread
append finished
get thread
append finished
get thread
append finished
get thread
1543663678.536858
1543663678.637320
append finished
get thread
append finished
get thread
append finished
get thread
append finished
get thread
1543663678.739243
1543663678.939648
1543663679.039879
1543663679.641264
append finished
get thread
append finished
get thread
append finished
get thread
1543663679.744043
1543663679.844810
1543663680.847105
1543663680.947357
append finished
get thread
append finished
get thread
1543663681.049641
1543663682.254425
1543663683.354699
append finished
get thread
append finished
get thread
append finished
get thread
append finished
get thread
1543663683.455565
1543663684.533582
1543663685.660982
1543663685.761228
append finished
get thread
1543663685.793456
1543663685.894058
1543663685.994291
1543663686.495346
1543663686.595550
append finished
get thread
append finished
get thread
append finished
get thread
1543663686.605275
1543663686.706074
1543663686.806334
1543663686.906582
append finished
get thread
1543663686.970811
1543663687.071231
1543663688.274106
append finished
get thread
1543663688.337986
1543663688.438180
1543663689.741877
1543663689.842160
append finished
get thread
append finished
get thread
...
append finished
get thread
append finished
Ping: 6.053 ms
Download: 0.00 Mbit/s
Upload: 11.72 Mbit/s

看看这个输出,就可以知道是标准测速状态。现在大致可以知道,消费者一直在while thread.isAlive():等待上传线程退出,但是一直没等到。所以现在去producer()里面看看创建的线程里到底发生了什么:

class HTTPUploader(threading.Thread):
    """Thread class for putting a URL"""

    def __init__(self, i, request, start, size, timeout, opener=None,
                 shutdown_event=None):
        threading.Thread.__init__(self)
        self.request = request
        self.request.data.start = self.starttime = start
        self.size = size
        self.result = None
        self.timeout = timeout
        self.i = i

        if opener:
            self._opener = opener.open
        else:
            self._opener = urlopen

        if shutdown_event:
            self._shutdown_event = shutdown_event
        else:
            self._shutdown_event = FakeShutdownEvent()

    def run(self):
        request = self.request
        try:
            if ((timeit.default_timer() - self.starttime) <= self.timeout and
                    not self._shutdown_event.isSet()):
                try:
                    f = self._opener(request)
                except TypeError:
                    # PY24 expects a string or buffer
                    # This also causes issues with Ctrl-C, but we will concede
                    # for the moment that Ctrl-C on PY24 isn't immediate
                    request = build_request(self.request.get_full_url(),
                                            data=request.data.read(self.size))
                    f = self._opener(request)
                f.read(11)
                f.close()
                self.result = sum(self.request.data.total)
            else:
                self.result = 0
        except (IOError, SpeedtestUploadTimeout):
            self.result = sum(self.request.data.total)

上传线程也很简单,就是简简单单的打开请求,然后阻塞读取返回数据,读取成功后统计上传,然后退出。如果这里会有问题,应该就是一直被阻塞,导致线程没有退出,所以消费者拿不到数据。那就调试看看:

def run(self):
    request = self.request
    try:
        if ((timeit.default_timer() - self.starttime) <= self.timeout and
                not self._shutdown_event.isSet()):
            try:
                printer("before open request")
                f = self._opener(request)
            except TypeError:
                # PY24 expects a string or buffer
                # This also causes issues with Ctrl-C, but we will concede
                # for the moment that Ctrl-C on PY24 isn't immediate
                request = build_request(self.request.get_full_url(),
                                        data=request.data.read(self.size))
                f = self._opener(request)
            printer("after open request")
            f.read(11)
            printer("read success")
            f.close()
            self.result = sum(self.request.data.total)
        else:
            self.result = 0
    except (IOError, SpeedtestUploadTimeout):
        self.result = sum(self.request.data.total)

看看输出:

liuxu:Downloads$ python3 ./speedtest-cli --no-download --simple --server 5145
before open request
before open request
before open request
before open request
Ping: 5.939 ms
Download: 0.00 Mbit/s
Upload: 1.65 Mbit/s

这就很清楚了,完全阻塞在_opener(request)处,连接都没有机会读数据。那么self._opener(request)是什么呢,往上层一层一层追踪,最后发现是build_opener()生成:

def build_opener(source_address=None, timeout=10):
    """Function similar to ``urllib2.build_opener`` that will build
    an ``OpenerDirector`` with the explicit handlers we want,
    ``source_address`` for binding, ``timeout`` and our custom
    `User-Agent`
    """

    printer('Timeout set to %d' % timeout, debug=True)

    if source_address:
        source_address_tuple = (source_address, 0)
        printer('Binding to source address: %r' % (source_address_tuple,),
                debug=True)
    else:
        source_address_tuple = None

    handlers = [
        ProxyHandler(),
        SpeedtestHTTPHandler(source_address=source_address_tuple,
                             timeout=timeout),
        SpeedtestHTTPSHandler(source_address=source_address_tuple,
                              timeout=timeout),
        HTTPDefaultErrorHandler(),
        HTTPRedirectHandler(),
        HTTPErrorProcessor()
    ]

    opener = OpenerDirector()
    opener.addheaders = [('User-agent', build_user_agent())]

    for handler in handlers:
        opener.add_handler(handler)

    return opener

上传使用的是SpeedtestHTTPHandler类:

class SpeedtestHTTPHandler(AbstractHTTPHandler):
    """Custom ``HTTPHandler`` that can build a ``HTTPConnection`` with the
    args we need for ``source_address`` and ``timeout``
    """
    def __init__(self, debuglevel=0, source_address=None, timeout=10):
        AbstractHTTPHandler.__init__(self, debuglevel)
        self.source_address = source_address
        self.timeout = timeout

    def http_open(self, req):
        return self.do_open(
            _build_connection(
                SpeedtestHTTPConnection,
                self.source_address,
                self.timeout
            ),
            req
        )

    http_request = AbstractHTTPHandler.do_request_

追到这里,由于请求会调用http_open,那么拆分它然后看看具体谁阻塞了:

def http_open(self, req):
        conn = _build_connection(
                SpeedtestHTTPConnection,
                self.source_address,
                self.timeout
            )
        printer("build connection success")
        res = self.do_open(
            conn,
            req
        )
        printer("open connection success")
        return res

    http_request = AbstractHTTPHandler.do_request_

再看看输出:

liuxu:Downloads$ python3 ./speedtest-cli --no-download --server 5145
Retrieving speedtest.net configuration...
build connection success
open connection success
Testing from China Unicom Beijing (114.249.28.210)...
Retrieving speedtest.net server list...
build connection success
open connection success
Retrieving information for the selected server...
Hosted by Beijing Unicom (Beijing) [1.67 km]: 5.578 ms
Skipping download test
Testing upload speedbuild connection success
.build connection success
.build connection success.
build connection success
....................................................
Upload: 1.65 Mbit/s

输出有点乱,上传部分从“Testing upload speed”开始,可以看见,只有“build connection success”,没有“open connection success”。这里就可以判断出,程序阻塞在了self.do_open(),它是AbstractHTTPHandler中的方法。现在就出现一个很尴尬的问题,在百度和Google中搜索AbstractHTTPHandler,没有任何相关信息。。

img1

img2

但根据引用头,可以知道在python2中是urllib2中的类,在python3中是urllib.request中的类:

try:
    from urllib2 import (urlopen, Request, HTTPError, URLError,
                         AbstractHTTPHandler, ProxyHandler,
                         HTTPDefaultErrorHandler, HTTPRedirectHandler,
                         HTTPErrorProcessor, OpenerDirector)
except ImportError:
    from urllib.request import (urlopen, Request, HTTPError, URLError,
                                AbstractHTTPHandler, ProxyHandler,
                                HTTPDefaultErrorHandler, HTTPRedirectHandler,
                                HTTPErrorProcessor, OpenerDirector)
liuxu:python2.7$ ls -l /usr/lib/python2.7/urllib2.py
-rw-r--r-- 1 root root 52513 12月  1 18:17 /usr/lib/python2.7/urllib2.py
liuxu:urllib$ ls -l /usr/lib/python3.6/urllib/request.py
-rw-r--r-- 1 root root 99998 12月  1 18:17 /usr/lib/python3.6/urllib/request.py

所以我去看看python3的这个类是什么:

class AbstractHTTPHandler(BaseHandler):

    def __init__(self, debuglevel=0):
        self._debuglevel = debuglevel

    def set_http_debuglevel(self, level):
        self._debuglevel = level

    def _get_content_length(self, request):
        return http.client.HTTPConnection._get_content_length(
            request.data,
            request.get_method())

    def do_request_(self, request):
        host = request.host
        if not host:
            raise URLError('no host given')

        if request.data is not None:  # POST
            data = request.data
            if isinstance(data, str):
                msg = "POST data should be bytes, an iterable of bytes, " \
                      "or a file object. It cannot be of type str."
                raise TypeError(msg)
            if not request.has_header('Content-type'):
                request.add_unredirected_header(
                    'Content-type',
                    'application/x-www-form-urlencoded')
            if (not request.has_header('Content-length')
                    and not request.has_header('Transfer-encoding')):
                content_length = self._get_content_length(request)
                if content_length is not None:
                    request.add_unredirected_header(
                            'Content-length', str(content_length))
                else:
                    request.add_unredirected_header(
                            'Transfer-encoding', 'chunked')

        sel_host = host
        if request.has_proxy():
            scheme, sel = splittype(request.selector)
            sel_host, sel_path = splithost(sel)
        if not request.has_header('Host'):
            request.add_unredirected_header('Host', sel_host)
        for name, value in self.parent.addheaders:
            name = name.capitalize()
            if not request.has_header(name):
                request.add_unredirected_header(name, value)

        return request

    def do_open(self, http_class, req, **http_conn_args):
        """Return an HTTPResponse object for the request, using http_class.

        http_class must implement the HTTPConnection API from http.client.
        """
        host = req.host
        if not host:
            raise URLError('no host given')

        # will parse host:port
        h = http_class(host, timeout=req.timeout, **http_conn_args)
        h.set_debuglevel(self._debuglevel)

        headers = dict(req.unredirected_hdrs)
        headers.update(dict((k, v) for k, v in req.headers.items()
                            if k not in headers))

        # TODO(jhylton): Should this be redesigned to handle
        # persistent connections?

        # We want to make an HTTP/1.1 request, but the addinfourl
        # class isn't prepared to deal with a persistent connection.
        # It will try to read all remaining data from the socket,
        # which will block while the server waits for the next request.
        # So make sure the connection gets closed after the (only)
        # request.
        headers["Connection"] = "close"
        headers = dict((name.title(), val) for name, val in headers.items())

        if req._tunnel_host:
            tunnel_headers = {}
            proxy_auth_hdr = "Proxy-Authorization"
            if proxy_auth_hdr in headers:
                tunnel_headers[proxy_auth_hdr] = headers[proxy_auth_hdr]
                # Proxy-Authorization should not be sent to origin
                # server.
                del headers[proxy_auth_hdr]
            h.set_tunnel(req._tunnel_host, headers=tunnel_headers)

        try:
            try:
                h.request(req.get_method(), req.selector, req.data, headers,
                          encode_chunked=req.has_header('Transfer-encoding'))
            except OSError as err: # timeout error
                raise URLError(err)
            r = h.getresponse()
        except:
            h.close()
            raise

        # If the server does not send us a 'Connection: close' header,
        # HTTPConnection assumes the socket should be left open. Manually
        # mark the socket to be closed when this response object goes away.
        if h.sock:
            h.sock.close()
            h.sock = None

        r.url = req.get_full_url()
        # This line replaces the .msg attribute of the HTTPResponse
        # with .headers, because urllib clients expect the response to
        # have the reason in .msg.  It would be good to mark this
        # attribute is deprecated and get then to use info() or
        # .headers.
        r.msg = r.reason
        return r

看看do_open(),代码也很简单,设置header头,发送请求,关闭连接。所以再看看python2的代码和它有什么不同:

class AbstractHTTPHandler(BaseHandler):

    def __init__(self, debuglevel=0):
        self._debuglevel = debuglevel

    def set_http_debuglevel(self, level):
        self._debuglevel = level

    def do_request_(self, request):
        host = request.get_host()
        if not host:
            raise URLError('no host given')

        if request.has_data():  # POST
            data = request.get_data()
            if not request.has_header('Content-type'):
                request.add_unredirected_header(
                    'Content-type',
                    'application/x-www-form-urlencoded')
            if not request.has_header('Content-length'):
                request.add_unredirected_header(
                    'Content-length', '%d' % len(data))

        sel_host = host
        if request.has_proxy():
            scheme, sel = splittype(request.get_selector())
            sel_host, sel_path = splithost(sel)

        if not request.has_header('Host'):
            request.add_unredirected_header('Host', sel_host)
        for name, value in self.parent.addheaders:
            name = name.capitalize()
            if not request.has_header(name):
                request.add_unredirected_header(name, value)

        return request

    def do_open(self, http_class, req, **http_conn_args):
        """Return an addinfourl object for the request, using http_class.

        http_class must implement the HTTPConnection API from httplib.
        The addinfourl return value is a file-like object.  It also
        has methods and attributes including:
            - info(): return a mimetools.Message object for the headers
            - geturl(): return the original request URL
            - code: HTTP status code
        """
        host = req.get_host()
        if not host:
            raise URLError('no host given')

        # will parse host:port
        h = http_class(host, timeout=req.timeout, **http_conn_args)
        h.set_debuglevel(self._debuglevel)

        headers = dict(req.unredirected_hdrs)
        headers.update(dict((k, v) for k, v in req.headers.items()
                            if k not in headers))

        # We want to make an HTTP/1.1 request, but the addinfourl
        # class isn't prepared to deal with a persistent connection.
        # It will try to read all remaining data from the socket,
        # which will block while the server waits for the next request.
        # So make sure the connection gets closed after the (only)
        # request.
        headers["Connection"] = "close"
        headers = dict(
            (name.title(), val) for name, val in headers.items())

        if req._tunnel_host:
            tunnel_headers = {}
            proxy_auth_hdr = "Proxy-Authorization"
            if proxy_auth_hdr in headers:
                tunnel_headers[proxy_auth_hdr] = headers[proxy_auth_hdr]
                # Proxy-Authorization should not be sent to origin
                # server.
                del headers[proxy_auth_hdr]
            h.set_tunnel(req._tunnel_host, headers=tunnel_headers)

        try:
            h.request(req.get_method(), req.get_selector(), req.data, headers)
        except socket.error, err: # XXX what error?
            h.close()
            raise URLError(err)
        else:
            try:
                r = h.getresponse(buffering=True)
            except TypeError: # buffering kw not supported
                r = h.getresponse()

        # Pick apart the HTTPResponse object to get the addinfourl
        # object initialized properly.

        # Wrap the HTTPResponse object in socket's file object adapter
        # for Windows.  That adapter calls recv(), so delegate recv()
        # to read().  This weird wrapping allows the returned object to
        # have readline() and readlines() methods.

        # XXX It might be better to extract the read buffering code
        # out of socket._fileobject() and into a base class.

        r.recv = r.read
        fp = socket._fileobject(r, close=True)

        resp = addinfourl(fp, r.msg, req.get_full_url())
        resp.code = r.status
        resp.msg = r.reason
        return resp

并没有太多不同的地方,所以我拿Meld软件对比了一下,发现python2python3do_request_()方法设置了不同的header头。

img3

下面是python2的请求头:

liuxu:Downloads$ python ./speedtest-cli --no-download --simple --server 5145
...
{'Content-Length': '524288', 'Host': 'www2.unicomtest.com:8080', 'User-Agent': 'Mozilla/5.0 (Linux; U; 64bit; en-us) Python/2.7.15rc1 (KHTML, like Gecko) speedtest-cli/2.0.2', 'Connection': 'close', 'Cache-Control': 'no-cache', 'Content-Type': 'application/x-www-form-urlencoded'}

下面是python3的请求头:

liuxu:Downloads$ python3 ./speedtest-cli --no-download --simple --server 5145
{'Content-Type': 'application/x-www-form-urlencoded', 'Transfer-Encoding': 'chunked', 'Host': 'www2.unicomtest.com:8080', 'User-Agent': 'Mozilla/5.0 (Linux; U; 64bit; en-us) Python/3.6.7 (KHTML, like Gecko) speedtest-cli/2.0.2', 'Cache-Control': 'no-cache', 'Connection': 'close'}

应该就是'Transfer-Encoding': 'chunked'的问题,因为python3中对请求使用了chunket技术,导致当前连接持续上传,所以speedtest子线程一直有效不退出,所以导致了消费者一直等待。由于speedtest中线程队列就建立了2个位置,所以这2个线程把数据上传完后一直等待到超时直到speedtest结束。

那么来看看是什么原因导致python3设置了chunked参数:

if (not request.has_header('Content-length')
        and not request.has_header('Transfer-encoding')):
    content_length = self._get_content_length(request)
    if content_length is not None:
        request.add_unredirected_header(
                'Content-length', str(content_length))
    else:
        request.add_unredirected_header(
                'Transfer-encoding', 'chunked')

只要没有Content-length头,就会设置chunked。所以我们只要在speedtest上传时设置好Content-length即可,我看见比较好的位置在Speedtest::upload()中:

def upload(self, callback=do_nothing, pre_allocate=True):
    """Test upload speed against speedtest.net"""

    sizes = []

    for size in self.config['sizes']['upload']:
        for _ in range(0, self.config['counts']['upload']):
            sizes.append(size)
    # request_count = len(sizes)
    request_count = self.config['upload_max']

    requests = []
    for i, size in enumerate(sizes):
        # We set ``0`` for ``start`` and handle setting the actual
        # ``start`` in ``HTTPUploader`` to get better measurements
        data = HTTPUploaderData(
            size,
            0,
            self.config['length']['upload'],
            shutdown_event=self._shutdown_event
        )
        if pre_allocate:
            data.pre_allocate()
        # 添加Content-length头,避免上传使用chunked
        headers = {'Content-length': size}
        requests.append(
            (
                build_request(self.best['url'], data, secure=self._secure, headers=headers),
                size
            )
        )

注意是在最下面几行。现在再执行测速看看:

liuxu:Downloads$ python3 ./speedtest-cli --no-download --simple --server 5145
Ping: 6.02 ms
Download: 0.00 Mbit/s
Upload: 13.07 Mbit/s

好了,这样就解决了这个问题。为了更清楚的探究具体哪些python版本受影响,我用git做了历史版本查询:

liuxu:~/cpython/Lib/urllib$ pwd
/home/liuxu/cpython/Lib/urllib
liuxu:~/cpython/Lib/urllib$ git log -L 1258,1260:request.py
commit 3c0d0baf2badfad7deb346d1043f7d83bb92691f
Author: Martin Panter <vadmium+py@gmail.com>
Date:   Wed Aug 24 06:33:33 2016 +0000

    Issue #12319: Support for chunked encoding of HTTP request bodies
    
    When the body object is a file, its size is no longer determined with
    fstat(), since that can report the wrong result (e.g. reading from a pipe).
    Instead, determine the size using seek(), or fall back to chunked encoding
    for unseekable files.
    
    Also, change the logic for detecting text files to check for TextIOBase
    inheritance, rather than inspecting the “mode” attribute, which may not
    exist (e.g. BytesIO and StringIO).  The Content-Length for text files is no
    longer determined ahead of time, because the original logic could have been
    wrong depending on the codec and newline translation settings.
    
    Patch by Demian Brecht and Rolf Krahl, with a few tweaks by me.

diff --git a/Lib/urllib/request.py b/Lib/urllib/request.py
--- a/Lib/urllib/request.py
+++ b/Lib/urllib/request.py
@@ -1261,3 +1256,3 @@
                 else:
                     request.add_unredirected_header(
-                            'Content-length', '%d' % (len(mv) * mv.itemsize))
+                            'Transfer-encoding', 'chunked')

commit id3c0d0baf2badfad7deb346d1043f7d83bb92691f,在github这里,然后在git gui里搜寻了一下,发现是从v3.6.0b1开始受影响。

img4

三、解决方案

根据上面的分析,如果你的python版本在v3.6.0b1或更新,就需要修复了。目前来看,ubuntu 18.04python3版本高于v3.6.0b1,所以受影响。ubuntu 16.04python3版本低于v3.6.0b1,所以不受影响。其他Linux发行版未测试。最后,给出几个解决方案:

1. 测试时添加很小的timeout参数

liuxu:Downloads$ python3 ./speedtest-cli --no-download --simple --server 5145 --timeout 1
Ping: 6.067 ms
Download: 0.00 Mbit/s
Upload: 11.67 Mbit/s

这个方法的原理是利用很小的timeout让每个线程中的连接上传后赶紧退出,好让后面的新线程上传数据。

def run(self):
    request = self.request
    try:
        if ((timeit.default_timer() - self.starttime) <= self.timeout and
                not self._shutdown_event.isSet()):
            try:
                f = self._opener(request)
            except TypeError:
                # PY24 expects a string or buffer
                # This also causes issues with Ctrl-C, but we will concede
                # for the moment that Ctrl-C on PY24 isn't immediate
                request = build_request(self.request.get_full_url(),
                                        data=request.data.read(self.size))
                f = self._opener(request)
            f.read(11)
            f.close()
            self.result = sum(self.request.data.total)
        else:
            self.result = 0
    except (IOError, SpeedtestUploadTimeout):
        self.result = sum(self.request.data.total)

timeout后,程序会做一次上传统计,这就是为什么上传会有4Mbps的原因,因为有前2个线程上传。顺便说一下,如果没设置timeout选项,默认是10秒。

parser.add_argument('--timeout', default=10, type=PARSER_TYPE_FLOAT,
                        help='HTTP timeout in seconds. Default 10')

2.使用我的补丁

liuxu:speedtest$ wget -O speedtest-cli https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest.py
wget -O speedtest.patch https://gist.githubusercontent.com/liuquanhao/45a481618fa48bb3568b4c3e7cf11302/raw/c0f358bc6c88a3e33f44f28537bf680efc0c34cc/speedtest.patch
liuxu:speedtest$ patch < speedtest.patch
patching file speedtest-cli

3.自己修改代码

代码在Speedtest::upload()中。

原代码:

    ...
    requests.append(
        (
            build_request(self.best['url'], data, secure=self._secure),
            size
        )
    )

def producer(q, requests, request_count):
    for i, request in enumerate(requests[:request_count]):

修复后的代码:

    ...
    headers = {'Content-length': size}
    requests.append(
        (
            build_request(self.best['url'], data, secure=self._secure, headers=headers),
            size
        )
    )

def producer(q, requests, request_count):
    for i, request in enumerate(requests[:request_count]):

4.等待官方更新

我已经给speedtest官方github提交一个pull request,希望他们能拉取吧。当然,也可以直接使用我的github

liuxu:speedtest$ wget -O speedtest-cli https://raw.githubusercontent.com/liuquanhao/speedtest-cli/master/speedtest.py
liuxu:speedtest$ chmod +x speedtest-cli

(2018-12-05更新)官方github已合并我的commit到devel分支,请等待官方正式版打包。


知识共享许可协议    鄂ICP备 15002452号-5    鄂公网安备 42088102000048号