LinuxQuestions.org
Visit Jeremy's Blog.
Home Forums Tutorials Articles Register
Go Back   LinuxQuestions.org > Forums > Linux Forums > Linux - Distributions > Puppy
User Name
Password
Puppy This forum is for the discussion of Puppy Linux.

Notices


Reply
  Search this Thread
Old 11-04-2014, 10:29 PM   #1
Fixit7
Senior Member
 
Registered: Mar 2014
Location: El Lago, Texas
Distribution: Ubuntu_Mate 16.04
Posts: 1,374

Rep: Reputation: 169Reputation: 169
DSL Speed test


Run a DSL speed test and log the results to a text file.

I tried to attach a zip file, but it won't let me.

Seems like a compressed file would save on bandwidth and storage.


Quote:
#!/bin/bash
# DSL_Speed_Test.sh
# Determine DSL Upload and Download speeds and redirect to a text file.
# Copyright 2012-2014 Matt Martz
# All Rights Reserved.
# Uses speedtest_cli.py located at http://xmodulo.com/check-internet-sp...ine-linux.html
#
# Tahr Puppy 6.0
# SiegeWorks 2014 A.P.K.
#
echo >> speed__test.txt
date "+DATE: %m/%d/%y%nTIME: %r" >> speed__test.txt
echo >> speed__test.txt
/usr/local/bin/speedtest-cli >> speed__test.txt
Quote:
# speedtest_cli.py
#
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2012-2014 Matt Martz
# All Rights Reserved.
# Determine DSL Download and Upload Speed
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

__version__ = '0.3.1'

# Some global variables we use
source = None
shutdown_event = None

import os
import re
import sys
import math
import signal
import socket
import timeit
import threading

# Used for bound_interface
socket_socket = socket.socket

try:
import xml.etree.cElementTree as ET
except ImportError:
try:
import xml.etree.ElementTree as ET
except ImportError:
from xml.dom import minidom as DOM
ET = None

# Begin import game to handle Python 2 and Python 3
try:
from urllib2 import urlopen, Request, HTTPError, URLError
except ImportError:
from urllib.request import urlopen, Request, HTTPError, URLError

try:
from httplib import HTTPConnection, HTTPSConnection
except ImportError:
from http.client import HTTPConnection, HTTPSConnection

try:
from Queue import Queue
except ImportError:
from queue import Queue

try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse

try:
from urlparse import parse_qs
except ImportError:
try:
from urllib.parse import parse_qs
except ImportError:
from cgi import parse_qs

try:
from hashlib import md5
except ImportError:
from md5 import md5

try:
from argparse import ArgumentParser as ArgParser
except ImportError:
from optparse import OptionParser as ArgParser

try:
import builtins
except ImportError:
def print_(*args, **kwargs):
"""The new-style print function taken from
https://pypi.python.org/pypi/six/

"""
fp = kwargs.pop("file", sys.stdout)
if fp is None:
return

def write(data):
if not isinstance(data, basestring):
data = str(data)
fp.write(data)

want_unicode = False
sep = kwargs.pop("sep", None)
if sep is not None:
if isinstance(sep, unicode):
want_unicode = True
elif not isinstance(sep, str):
raise TypeError("sep must be None or a string")
end = kwargs.pop("end", None)
if end is not None:
if isinstance(end, unicode):
want_unicode = True
elif not isinstance(end, str):
raise TypeError("end must be None or a string")
if kwargs:
raise TypeError("invalid keyword arguments to print()")
if not want_unicode:
for arg in args:
if isinstance(arg, unicode):
want_unicode = True
break
if want_unicode:
newline = unicode("\n")
space = unicode(" ")
else:
newline = "\n"
space = " "
if sep is None:
sep = space
if end is None:
end = newline
for i, arg in enumerate(args):
if i:
write(sep)
write(arg)
write(end)
else:
print_ = getattr(builtins, 'print')
del builtins


def bound_socket(*args, **kwargs):
"""Bind socket to a specified source IP address"""

global source
sock = socket_socket(*args, **kwargs)
sock.bind((source, 0))
return sock


def distance(origin, destination):
"""Determine distance between 2 sets of [lat,lon] in km"""

lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km

dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1))
* math.cos(math.radians(lat2)) * math.sin(dlon / 2)
* math.sin(dlon / 2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d = radius * c

return d


class FileGetter(threading.Thread):
"""Thread class for retrieving a URL"""

def __init__(self, url, start):
self.url = url
self.result = None
self.starttime = start
threading.Thread.__init__(self)

def run(self):
self.result = [0]
try:
if (timeit.default_timer() - self.starttime) <= 10:
f = urlopen(self.url)
while 1 and not shutdown_event.isSet():
self.result.append(len(f.read(10240)))
if self.result[-1] == 0:
break
f.close()
except IOError:
pass


def downloadSpeed(files, quiet=False):
"""Function to launch FileGetter threads and calculate download speeds"""

start = timeit.default_timer()

def producer(q, files):
for file in files:
thread = FileGetter(file, start)
thread.start()
q.put(thread, True)
if not quiet and not shutdown_event.isSet():
sys.stdout.write('.')
sys.stdout.flush()

finished = []

def consumer(q, total_files):
while len(finished) < total_files:
thread = q.get(True)
while thread.isAlive():
thread.join(timeout=0.1)
finished.append(sum(thread.result))
del thread

q = Queue(6)
prod_thread = threading.Thread(target=producer, args=(q, files))
cons_thread = threading.Thread(target=consumer, args=(q, len(files)))
start = timeit.default_timer()
prod_thread.start()
cons_thread.start()
while prod_thread.isAlive():
prod_thread.join(timeout=0.1)
while cons_thread.isAlive():
cons_thread.join(timeout=0.1)
return (sum(finished) / (timeit.default_timer() - start))


class FilePutter(threading.Thread):
"""Thread class for putting a URL"""

def __init__(self, url, start, size):
self.url = url
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
data = chars * (int(round(int(size) / 36.0)))
self.data = ('content1=%s' % data[0:int(size) - 9]).encode()
del data
self.result = None
self.starttime = start
threading.Thread.__init__(self)

def run(self):
try:
if ((timeit.default_timer() - self.starttime) <= 10 and
not shutdown_event.isSet()):
f = urlopen(self.url, self.data)
f.read(11)
f.close()
self.result = len(self.data)
else:
self.result = 0
except IOError:
self.result = 0


def uploadSpeed(url, sizes, quiet=False):
"""Function to launch FilePutter threads and calculate upload speeds"""

start = timeit.default_timer()

def producer(q, sizes):
for size in sizes:
thread = FilePutter(url, start, size)
thread.start()
q.put(thread, True)
if not quiet and not shutdown_event.isSet():
sys.stdout.write('.')
sys.stdout.flush()

finished = []

def consumer(q, total_sizes):
while len(finished) < total_sizes:
thread = q.get(True)
while thread.isAlive():
thread.join(timeout=0.1)
finished.append(thread.result)
del thread

q = Queue(6)
prod_thread = threading.Thread(target=producer, args=(q, sizes))
cons_thread = threading.Thread(target=consumer, args=(q, len(sizes)))
start = timeit.default_timer()
prod_thread.start()
cons_thread.start()
while prod_thread.isAlive():
prod_thread.join(timeout=0.1)
while cons_thread.isAlive():
cons_thread.join(timeout=0.1)
return (sum(finished) / (timeit.default_timer() - start))


def getAttributesByTagName(dom, tagName):
"""Retrieve an attribute from an XML document and return it in a
consistent format

Only used with xml.dom.minidom, which is likely only to be used
with python versions older than 2.5
"""
elem = dom.getElementsByTagName(tagName)[0]
return dict(list(elem.attributes.items()))


def getConfig():
"""Download the speedtest.net configuration and return only the data
we are interested in
"""

uh = urlopen('http://www.speedtest.net/speedtest-config.php')
configxml = []
while 1:
configxml.append(uh.read(10240))
if len(configxml[-1]) == 0:
break
if int(uh.code) != 200:
return None
uh.close()
try:
try:
root = ET.fromstring(''.encode().join(configxml))
config = {
'client': root.find('client').attrib,
'times': root.find('times').attrib,
'download': root.find('download').attrib,
'upload': root.find('upload').attrib}
except AttributeError:
root = DOM.parseString(''.join(configxml))
config = {
'client': getAttributesByTagName(root, 'client'),
'times': getAttributesByTagName(root, 'times'),
'download': getAttributesByTagName(root, 'download'),
'upload': getAttributesByTagName(root, 'upload')}
except SyntaxError:
print_('Failed to parse speedtest.net configuration')
sys.exit(1)
del root
del configxml
return config


def closestServers(client, all=False):
"""Determine the 5 closest speedtest.net servers based on geographic
distance
"""

uh = urlopen('http://www.speedtest.net/speedtest-servers-static.php')
serversxml = []
while 1:
serversxml.append(uh.read(10240))
if len(serversxml[-1]) == 0:
break
if int(uh.code) != 200:
return None
uh.close()
try:
try:
root = ET.fromstring(''.encode().join(serversxml))
elements = root.getiterator('server')
except AttributeError:
root = DOM.parseString(''.join(serversxml))
elements = root.getElementsByTagName('server')
except SyntaxError:
print_('Failed to parse list of speedtest.net servers')
sys.exit(1)
servers = {}
for server in elements:
try:
attrib = server.attrib
except AttributeError:
attrib = dict(list(server.attributes.items()))
d = distance([float(client['lat']), float(client['lon'])],
[float(attrib.get('lat')), float(attrib.get('lon'))])
attrib['d'] = d
if d not in servers:
servers[d] = [attrib]
else:
servers[d].append(attrib)
del root
del serversxml
del elements

closest = []
for d in sorted(servers.keys()):
for s in servers[d]:
closest.append(s)
if len(closest) == 5 and not all:
break
else:
continue
break

del servers
return closest


def getBestServer(servers):
"""Perform a speedtest.net latency request to determine which
speedtest.net server has the lowest latency
"""

results = {}
for server in servers:
cum = []
url = '%s/latency.txt' % os.path.dirname(server['url'])
urlparts = urlparse(url)
for i in range(0, 3):
try:
if urlparts[0] == 'https':
h = HTTPSConnection(urlparts[1])
else:
h = HTTPConnection(urlparts[1])
start = timeit.default_timer()
h.request("GET", urlparts[2])
r = h.getresponse()
total = (timeit.default_timer() - start)
except (HTTPError, URLError, socket.error):
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000, 3)
results[avg] = server
fastest = sorted(results.keys())[0]
best = results[fastest]
best['latency'] = fastest

return best


def ctrl_c(signum, frame):
"""Catch Ctrl-C key sequence and set a shutdown_event for our threaded
operations
"""

global shutdown_event
shutdown_event.set()
raise SystemExit('\nCancelling...')


def version():
"""Print the version"""

raise SystemExit(__version__)


def speedtest():
"""Run the full speedtest.net test"""

global shutdown_event, source
shutdown_event = threading.Event()

signal.signal(signal.SIGINT, ctrl_c)

description = (
'Command line interface for testing internet bandwidth using '
'speedtest.net.\n'
'------------------------------------------------------------'
'--------------\n'
'https://github.com/sivel/speedtest-cli')

parser = ArgParser(description=description)
# Give optparse.OptionParser an `add_argument` method for
# compatibility with argparse.ArgumentParser
try:
parser.add_argument = parser.add_option
except AttributeError:
pass
parser.add_argument('--bytes', dest='units', action='store_const',
const=('bytes', 1), default=('bits', 8),
help='Display values in bytes instead of bits. Does '
'not affect the image generated by --share')
parser.add_argument('--share', action='store_true',
help='Generate and provide a URL to the speedtest.net '
'share results image')
parser.add_argument('--simple', action='store_true',
help='Suppress verbose output, only show basic '
'information')
parser.add_argument('--list', action='store_true',
help='Display a list of speedtest.net servers '
'sorted by distance')
parser.add_argument('--server', help='Specify a server ID to test against')
parser.add_argument('--mini', help='URL of the Speedtest Mini server')
parser.add_argument('--source', help='Source IP address to bind to')
parser.add_argument('--version', action='store_true',
help='Show the version number and exit')

options = parser.parse_args()
if isinstance(options, tuple):
args = options[0]
else:
args = options
del options

# Print the version and exit
if args.version:
version()

# If specified bind to a specific IP address
if args.source:
source = args.source
socket.socket = bound_socket

if not args.simple:
print_('Retrieving speedtest.net configuration...')
try:
config = getConfig()
except URLError:
print_('Cannot retrieve speedtest configuration')
sys.exit(1)

if not args.simple:
print_('Retrieving speedtest.net server list...')
if args.list or args.server:
servers = closestServers(config['client'], True)
if args.list:
serverList = []
for server in servers:
line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) '
'[%(d)0.2f km]' % server)
serverList.append(line)
# Python 2.7 and newer seem to be ok with the resultant encoding
# from parsing the XML, but older versions have some issues.
# This block should detect whether we need to encode or not
try:
unicode()
print_('\n'.join(serverList).encode('utf-8', 'ignore'))
except NameError:
print_('\n'.join(serverList))
except IOError:
pass
sys.exit(0)
else:
servers = closestServers(config['client'])

if not args.simple:
print_('Testing from %(isp)s (%(ip)s)...' % config['client'])

if args.server:
try:
best = getBestServer(filter(lambda x: x['id'] == args.server,
servers))
except IndexError:
print_('Invalid server ID')
sys.exit(1)
elif args.mini:
name, ext = os.path.splitext(args.mini)
if ext:
url = os.path.dirname(args.mini)
else:
url = args.mini
urlparts = urlparse(url)
try:
f = urlopen(args.mini)
except:
print_('Invalid Speedtest Mini URL')
sys.exit(1)
else:
text = f.read()
f.close()
extension = re.findall('upload_extension: "([^"]+)"', text.decode())
if not extension:
for ext in ['php', 'asp', 'aspx', 'jsp']:
try:
f = urlopen('%s/speedtest/upload.%s' % (args.mini, ext))
except:
pass
else:
data = f.read().strip()
if (f.code == 200 and
len(data.splitlines()) == 1 and
re.match('size=[0-9]', data)):
extension = [ext]
break
if not urlparts or not extension:
print_('Please provide the full URL of your Speedtest Mini server')
sys.exit(1)
servers = [{
'sponsor': 'Speedtest Mini',
'name': urlparts[1],
'd': 0,
'url': '%s/speedtest/upload.%s' % (url.rstrip('/'), extension[0]),
'latency': 0,
'id': 0
}]
try:
best = getBestServer(servers)
except:
best = servers[0]
else:
if not args.simple:
print_('Selecting best server based on latency...')
best = getBestServer(servers)

if not args.simple:
# Python 2.7 and newer seem to be ok with the resultant encoding
# from parsing the XML, but older versions have some issues.
# This block should detect whether we need to encode or not
try:
unicode()
print_(('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: '
'%(latency)s ms' % best).encode('utf-8', 'ignore'))
except NameError:
print_('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: '
'%(latency)s ms' % best)
else:
print_('Ping: %(latency)s ms' % best)

sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000]
urls = []
for size in sizes:
for i in range(0, 4):
urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(best['url']), size, size))
if not args.simple:
print_('Testing download speed', end='')
dlspeed = downloadSpeed(urls, args.simple)
if not args.simple:
print_()
print_('Download: %0.2f M%s/s' %
((dlspeed / 1000 / 1000) * args.units[1], args.units[0]))

sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)]
sizes = []
for size in sizesizes:
for i in range(0, 25):
sizes.append(size)
if not args.simple:
print_('Testing upload speed', end='')
ulspeed = uploadSpeed(best['url'], sizes, args.simple)
if not args.simple:
print_()
print_('Upload: %0.2f M%s/s' %
((ulspeed / 1000 / 1000) * args.units[1], args.units[0]))

if args.share and args.mini:
print_('Cannot generate a speedtest.net share results image while '
'testing against a Speedtest Mini server')
elif args.share:
dlspeedk = int(round((dlspeed / 1000) * 8, 0))
ping = int(round(best['latency'], 0))
ulspeedk = int(round((ulspeed / 1000) * 8, 0))

# Build the request to send results back to speedtest.net
# We use a list instead of a dict because the API expects parameters
# in a certain order
apiData = [
'download=%s' % dlspeedk,
'ping=%s' % ping,
'upload=%s' % ulspeedk,
'promo=',
'startmode=%s' % 'pingselect',
'recommendedserverid=%s' % best['id'],
'accuracy=%s' % 1,
'serverid=%s' % best['id'],
'hash=%s' % md5(('%s-%s-%s-%s' %
(ping, ulspeedk, dlspeedk, '297aae72'))
.encode()).hexdigest()]

req = Request('http://www.speedtest.net/api/api.php',
data='&'.join(apiData).encode())
req.add_header('Referer', 'http://c.speedtest.net/flash/speedtest.swf')
f = urlopen(req)
response = f.read()
code = f.code
f.close()

if int(code) != 200:
print_('Could not submit results to speedtest.net')
sys.exit(1)

qsargs = parse_qs(response.decode())
resultid = qsargs.get('resultid')
if not resultid or len(resultid) != 1:
print_('Could not submit results to speedtest.net')
sys.exit(1)

print_('Share results: http://www.speedtest.net/result/%s.png' %
resultid[0])


def main():
try:
speedtest()
except KeyboardInterrupt:
print_('\nCancelling...')


if __name__ == '__main__':
main()

# vim:ts=4:sw=4:expandtab

Last edited by Fixit7; 11-04-2014 at 10:36 PM.
 
Old 11-07-2014, 06:40 AM   #2
sag47
Senior Member
 
Registered: Sep 2009
Location: Raleigh, NC
Distribution: Ubuntu, PopOS, Raspbian
Posts: 1,899
Blog Entries: 36

Rep: Reputation: 477Reputation: 477Reputation: 477Reputation: 477Reputation: 477
Do you have a question? Also, why post the source inline when a link to the source would have sufficed?

SAM

Last edited by sag47; 11-07-2014 at 06:41 AM.
 
Old 11-07-2014, 12:08 PM   #3
Fixit7
Senior Member
 
Registered: Mar 2014
Location: El Lago, Texas
Distribution: Ubuntu_Mate 16.04
Posts: 1,374

Original Poster
Rep: Reputation: 169Reputation: 169
Quote:
Originally Posted by sag47 View Post
Do you have a question? Also, why post the source inline when a link to the source would have sufficed?

SAM
Just trying to be helpful. :-)



It included a script that creates a log file. The link to the python script did not.
 
Old 11-07-2014, 02:15 PM   #4
sag47
Senior Member
 
Registered: Sep 2009
Location: Raleigh, NC
Distribution: Ubuntu, PopOS, Raspbian
Posts: 1,899
Blog Entries: 36

Rep: Reputation: 477Reputation: 477Reputation: 477Reputation: 477Reputation: 477
It's certainly an interesting utility. Thanks for sharing.

What I meant about linking the script instead: if you link a script then if the person who maintains it writes bug fixes then a simple link will automatically benefit. If you quote the code inline then you could still have buggy code quoted when it was fixed upstream.

I tend to have a scripts repository for code I have written and use. https://github.com/samrocketman?tab=repositories (see drexel-university and home projects). If you find yourself having written a few or even one useful script then putting it in a repository is something to consider.
 
Old 11-07-2014, 04:39 PM   #5
Fixit7
Senior Member
 
Registered: Mar 2014
Location: El Lago, Texas
Distribution: Ubuntu_Mate 16.04
Posts: 1,374

Original Poster
Rep: Reputation: 169Reputation: 169
Look at the bash script I posted.

It has a link to the author.
 
  


Reply



Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
Test your PC's speed - namida12 MEPIS 11 10-26-2007 05:46 PM
speed test daashag Linux - Networking 4 03-30-2006 01:29 PM
speed test Yomaoni Linux - Software 4 07-18-2005 04:33 PM
Speed! DSL vs. T1 mangolicious Linux - Networking 5 10-03-2004 11:05 AM
hi speed dsl jason2 Linux - Networking 3 08-29-2004 01:50 PM

LinuxQuestions.org > Forums > Linux Forums > Linux - Distributions > Puppy

All times are GMT -5. The time now is 05:41 PM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration