Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Commit

Permalink
Merge pull request #196 from adonoho/pr-fix-stream
Browse files Browse the repository at this point in the history
A Simpler Fix to the Streaming Code due to Changes from Twitter on Jan. 13, 2014.
Gentlefolk,

This is a candidate release patch. I propose it become the formal branch of this library and have dubbed it version v1.10.3. I once again formally thank RouxRC for his efforts moving this library forward. Any errors in this patch remain mine and do not reflect upon RouxRC or his code.

This library is a high performance streaming library. Compared to other Twitter libraries, it is easily an order of magnitude faster at delivering tweets to your application. Why is that? When streaming, this library pierces Python's urllib abstraction and takes control of the socket. It interprets the HTTP stream directly. That makes it fast. It also makes it vulnerable to changes. It needed to be upgraded when Twitter upgraded the protocol version.

Twitter's switch to HTTP v1.1 was long overdue. 

Summary of changes:

- Based upon RouxRC's code, I turned off gzip compression. My version is slightly different than RouxRC's version.
- Instead of incrementally reading arbitrary lengths of bytes from the socket and seeing if they parse in the JSON parser, a good technique, the switch to HTTP chunking forced us to process in chunk sized blocks. Based upon inspection, Twitter never sends partial JSON in a chunk. They also send keep-alive delimiters in single 7 byte long chunks. This code depends upon both of these observations. It does not do general purpose HTTP chunk processing. It is a Twitter specific HTTP chunk parser.
- Chunk oriented processing allowed me to isolate stream interpretation to the chunk code and migrate the wrapper code to operate exclusively using strings. This makes the wrapper code more readable.
- Once I had opened up the wrapper code, I cleaned it up. This involved modest edits in how certain socket parameters were determined and moving data exclusive to the generator into the generator and out of the containing object.
- As this is exclusively socket oriented code, the HTTP exception catching was removed from the method. The exception was moved to wrap the opening of the socket by url lib.
- Due to reading the data in larger chunks and, hence, running it through the JSON parser less often, this code is about 10% faster than the prior generation.
- When Twitter hangs up on us, this code emits a `hangup` message in the stream. 
- This code has been tested using Python v2.7.6 and v3.3.3 on OS X 10.8.5 (Mountain Lion). I have tested it on the high volume sample stream and on a user stream under both versions of Python. It is believed, but not tested, that it will function under Python v2.6.x. It uses the bytearray type. I believe that has been back ported all the way to Python v2.6.x. As the code is not particularly tricky, I do not foresee that it has introduced any new issues that were not already apparent in this library.
- I use this patch in production and have captured 50M+ tweets with it. It is solid and reliable. If you find it to not be so, please contact me. I use it in production and have a vested interest in ensuring that it catches all corner cases.

Thank you for your patience while I refine this patch and I ask Mr. Verdone to select this patch as the basis for moving this library forward.

Enjoy and Anon,
Andrew
  • Loading branch information
sixohsix committed Feb 3, 2014
2 parents 31dd1a0 + 12bb62d commit 40834b5
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ twitter3.egg-info
*~
dist
build
.idea
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
import sys, os

version = '1.10.2'
version = '1.10.3'

install_requires = [
# -*- Extra requirements: -*-
Expand Down
13 changes: 7 additions & 6 deletions twitter/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class TwitterCall(object):

def __init__(
self, auth, format, domain, callable_cls, uri="",
uriparts=None, secure=True, timeout=None):
uriparts=None, secure=True, timeout=None, gzip=False):
self.auth = auth
self.format = format
self.domain = domain
Expand All @@ -137,6 +137,7 @@ def __init__(
self.uriparts = uriparts
self.secure = secure
self.timeout = timeout
self.gzip = gzip

def __getattr__(self, k):
try:
Expand All @@ -145,9 +146,9 @@ def __getattr__(self, k):
def extend_call(arg):
return self.callable_cls(
auth=self.auth, format=self.format, domain=self.domain,
callable_cls=self.callable_cls, timeout=self.timeout, uriparts=self.uriparts \
+ (arg,),
secure=self.secure)
callable_cls=self.callable_cls, timeout=self.timeout,
secure=self.secure, gzip=self.gzip,
uriparts=self.uriparts + (arg,))
if k == "_":
return extend_call
else:
Expand Down Expand Up @@ -194,13 +195,13 @@ def __call__(self, **kwargs):
uriBase = "http%s://%s/%s%s%s" %(
secure_str, self.domain, uri, dot, self.format)

headers = {'Accept-Encoding': 'gzip'}
headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
body = None; arg_data = None
if self.auth:
headers.update(self.auth.generate_headers())
arg_data = self.auth.encode_params(uriBase, method, kwargs)
if method == 'GET':
uriBase += '?' + arg_data
body = None
else:
body = arg_data.encode('utf8')

Expand Down
94 changes: 61 additions & 33 deletions twitter/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,89 @@

from .api import TwitterCall, wrap_response, TwitterHTTPError

def recv_chunk(sock): # -> bytearray:

buf = sock.recv(8) # Scan for an up to 16MiB chunk size (0xffffff).
crlf = buf.find(b'\r\n') # Find the HTTP chunk size.

if crlf > 0: # If there is a length, then process it

remaining = int(buf[:crlf], 16) # Decode the chunk size.

start = crlf + 2 # Add in the length of the header's CRLF pair.
end = len(buf) - start

chunk = bytearray(remaining)

if remaining <= 2: # E.g. an HTTP chunk with just a keep-alive delimiter.
chunk[:remaining] = buf[start:start + remaining]
# There are several edge cases (remaining == [3-6]) as the chunk size exceeds the length
# of the initial read of 8 bytes. With Twitter, these do not, in practice, occur. The
# shortest JSON message starts with '{"limit":{'. Hence, it exceeds in size the edge cases
# and eliminates the need to address them.
else: # There is more to read in the chunk.
chunk[:end] = buf[start:]
chunk[end:] = sock.recv(remaining - end)
sock.recv(2) # Read the trailing CRLF pair. Throw it away.

return chunk

return bytearray()

## recv_chunk()


class TwitterJSONIter(object):

def __init__(self, handle, uri, arg_data, block=True, timeout=None):
self.decoder = json.JSONDecoder()
self.handle = handle
self.buf = b""
self.uri = uri
self.arg_data = arg_data
self.block = block
self.timeout = timeout
self.timer = time.time()


def __iter__(self):
if sys.version_info >= (3, 0):
sock = self.handle.fp.raw._sock
else:
sock = self.handle.fp._sock.fp._sock
sock = self.handle.fp.raw._sock if sys.version_info >= (3, 0) else self.handle.fp._sock.fp._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if not self.block or self.timeout:
sock.setblocking(False)
sock.setblocking(self.block and not self.timeout)
buf = u''
json_decoder = json.JSONDecoder()
timer = time.time()
while True:
try:
utf8_buf = self.buf.decode('utf8').lstrip()
res, ptr = self.decoder.raw_decode(utf8_buf)
self.buf = utf8_buf[ptr:].encode('utf8')
buf = buf.lstrip()
res, ptr = json_decoder.raw_decode(buf)
buf = buf[ptr:]
yield wrap_response(res, self.handle.headers)
self.timer = time.time()
timer = time.time()
continue
except ValueError as e:
if self.block:
pass
else:
yield None
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, self.format, arg_data)
# this is a non-blocking read (ie, it will return if any data is available)
if self.block: pass
else: yield None
try:
buf = buf.lstrip() # Remove any keep-alive delimiters to detect hangups.
if self.timeout:
ready_to_read = select.select([sock], [], [], self.timeout)
if ready_to_read[0]:
self.buf += sock.recv(1024)
if time.time() - self.timer > self.timeout:
yield {"timeout":True}
else:
yield {"timeout":True}
buf += recv_chunk(sock).decode('utf-8') # This is a non-blocking read.
if time.time() - timer > self.timeout:
yield {'timeout': True}
else: yield {'timeout': True}
else:
self.buf += sock.recv(1024)
buf += recv_chunk(sock).decode('utf-8')
if not buf and self.block:
yield {'hangup': True}
except SSLError as e:
if (not self.block or self.timeout) and (e.errno == 2):
# Apparently this means there was nothing in the socket buf
pass
else:
raise
# Error from a non-blocking read of an empty buffer.
if (not self.block or self.timeout) and (e.errno == 2): pass
else: raise

def handle_stream_response(req, uri, arg_data, block, timeout=None):
handle = urllib_request.urlopen(req,)
try:
handle = urllib_request.urlopen(req,)
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, 'json', arg_data)
return iter(TwitterJSONIter(handle, uri, arg_data, block, timeout=timeout))

class TwitterStreamCallWithTimeout(TwitterCall):
Expand Down Expand Up @@ -119,4 +147,4 @@ def __init__(
TwitterStreamCall.__init__(
self, auth=auth, format="json", domain=domain,
callable_cls=call_cls,
secure=secure, uriparts=uriparts, timeout=timeout)
secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)
53 changes: 41 additions & 12 deletions twitter/stream_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,59 @@
USAGE
twitter-stream-example <username> <password>
stream-example -t <token> -ts <token_secret> -ck <consumer_key> -cs <consumer_secret>
"""

from __future__ import print_function

import sys
import argparse

from .stream import TwitterStream
from .auth import UserPassAuth
from .util import printNicely
from twitter.stream import TwitterStream
from twitter.oauth import OAuth
from twitter.util import printNicely

def main(args=sys.argv[1:]):
if not args[1:]:
print(__doc__)
return 1

# When using twitter stream you must authorize. UserPass or OAuth.
stream = TwitterStream(auth=UserPassAuth(args[0], args[1]))
def parse_arguments():

parser = argparse.ArgumentParser()

parser.add_argument('-t', '--token', help='The Twitter Access Token.')
parser.add_argument('-ts', '--token_secret', help='The Twitter Access Token Secret.')
parser.add_argument('-ck', '--consumer_key', help='The Twitter Consumer Key.')
parser.add_argument('-cs', '--consumer_secret', help='The Twitter Consumer Secret.')
parser.add_argument('-us', '--user_stream', action='store_true', help='Connect to the user stream endpoint.')
parser.add_argument('-ss', '--site_stream', action='store_true', help='Connect to the site stream endpoint.')

return parser.parse_args()

## parse_arguments()


def main():

args = parse_arguments()

# When using twitter stream you must authorize.
auth = OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret)
if args.user_stream:
stream = TwitterStream(auth=auth, domain='userstream.twitter.com')
tweet_iter = stream.user()
elif args.site_stream:
stream = TwitterStream(auth=auth, domain='sitestream.twitter.com')
tweet_iter = stream.site()
else:
stream = TwitterStream(auth=auth, timeout=60.0)
tweet_iter = stream.statuses.sample()

# Iterate over the sample stream.
tweet_iter = stream.statuses.sample()
for tweet in tweet_iter:
# You must test that your tweet has text. It might be a delete
# or data message.
if tweet.get('text'):
printNicely(tweet['text'])

## main()

if __name__ == '__main__':
main()

0 comments on commit 40834b5

Please sign in to comment.