Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

A Simpler Fix to the Streaming Code due to Changes from Twitter on Jan. 13, 2014. #196

Merged
merged 27 commits into from
Feb 3, 2014
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
128ec04
Ignore .idea files from PyCharm.
adonoho Jan 14, 2014
8631806
Do not send gzip headers for streaming calls. (ref: RouxRC [73efaca])
adonoho Jan 15, 2014
ff3ca19
Fix the uri extension by attribute name.
adonoho Jan 15, 2014
25ea832
Change some default attributes and add TwitterHTTPError (ref: RouxRC …
adonoho Jan 15, 2014
d488eec
Test for delimiters in the stream and removes them. Add comments.
adonoho Jan 15, 2014
95d4980
Update to use OAuth, take in command line arguments and modify the im…
adonoho Jan 15, 2014
d908997
Move the potentially uninitialized values out of the if test.
adonoho Jan 16, 2014
ef99d73
Increase the size of the read buffer to be larger than the average tw…
adonoho Jan 16, 2014
7333aa5
Add support for both user and site streams.
adonoho Jan 17, 2014
57aa6d8
Bring HTTP chunk downloading into its own separate method.
adonoho Jan 17, 2014
54555a7
Cosmetic edit
adonoho Jan 20, 2014
b8cdd54
Merge branch 'fix-stream' into pr-fix-stream
adonoho Jan 20, 2014
c0fc741
Minimize string decoding and move to use a bytearray for the buffer. …
adonoho Jan 23, 2014
02bce53
Cosmetic edits.
adonoho Jan 24, 2014
db75126
Merge branch 'fix-stream' into pr-fix-stream
adonoho Jan 24, 2014
2693800
Move recv_chunk() into a stand alone function. Further minimize memor…
adonoho Jan 26, 2014
e28a1da
Move variables out of the iterator class and into the generator funct…
adonoho Jan 26, 2014
23dcd46
As Twitter appears to send complete JSON in the chunks, we can simpli…
adonoho Jan 27, 2014
28a8ef6
Further refine socket management.
adonoho Jan 28, 2014
cd2fbdf
Bump the version number.
adonoho Jan 28, 2014
c20d1a8
Remove all keep-alive delimiters to allow the hangup patch to function.
adonoho Jan 28, 2014
0d92536
Remove socket timeout mutation code.
adonoho Jan 28, 2014
b01fa3f
Set a timeout on the main sample stream to test that code path.
adonoho Jan 28, 2014
443e409
Handle HTTP chunks that only contain keep-alive delimiters.
adonoho Jan 28, 2014
3e782f6
Add comments detailing why we can avoid handling certain edge cases i…
adonoho Jan 29, 2014
a8880f9
Clarify the comment about edge cases.
adonoho Jan 29, 2014
12bb62d
Merge branch 'fix-stream' into pr-fix-stream
adonoho Jan 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ twitter3.egg-info
*~
dist
build
.idea
11 changes: 6 additions & 5 deletions twitter/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class TwitterCall(object):

def __init__(
self, auth, format, domain, callable_cls, uri="",
uriparts=None, secure=True, timeout=None):
uriparts=None, secure=True, timeout=None, gzip=False):
self.auth = auth
self.format = format
self.domain = domain
Expand All @@ -137,6 +137,7 @@ def __init__(
self.uriparts = uriparts
self.secure = secure
self.timeout = timeout
self.gzip = gzip

def __getattr__(self, k):
try:
Expand All @@ -145,9 +146,9 @@ def __getattr__(self, k):
def extend_call(arg):
return self.callable_cls(
auth=self.auth, format=self.format, domain=self.domain,
callable_cls=self.callable_cls, timeout=self.timeout, uriparts=self.uriparts \
+ (arg,),
secure=self.secure)
callable_cls=self.callable_cls, timeout=self.timeout,
secure=self.secure, gzip=self.gzip,
uriparts=self.uriparts + (arg,))
if k == "_":
return extend_call
else:
Expand Down Expand Up @@ -194,7 +195,7 @@ def __call__(self, **kwargs):
uriBase = "http%s://%s/%s%s%s" %(
secure_str, self.domain, uri, dot, self.format)

headers = {'Accept-Encoding': 'gzip'}
headers = {'Accept-Encoding': 'gzip'} if self.gzip else dict()
if self.auth:
headers.update(self.auth.generate_headers())
arg_data = self.auth.encode_params(uriBase, method, kwargs)
Expand Down
15 changes: 11 additions & 4 deletions twitter/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class TwitterJSONIter(object):
def __init__(self, handle, uri, arg_data, block=True, timeout=None):
self.decoder = json.JSONDecoder()
self.handle = handle
self.uri = uri
self.arg_data = arg_data
self.buf = b""
self.block = block
self.timeout = timeout
Expand All @@ -34,6 +36,9 @@ def __iter__(self):
while True:
try:
utf8_buf = self.buf.decode('utf8').lstrip()
if utf8_buf and utf8_buf[0] != '{': # Remove the hex delimiter length and extra whitespace.
utf8_buf = utf8_buf.lstrip('0123456789abcdefABCDEF')
utf8_buf = utf8_buf.lstrip()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to try it out but I'm worried this should break the delimited=true argument from twitter API by doing this this way

res, ptr = self.decoder.raw_decode(utf8_buf)
self.buf = utf8_buf[ptr:].encode('utf8')
yield wrap_response(res, self.handle.headers)
Expand All @@ -44,8 +49,8 @@ def __iter__(self):
pass
else:
yield None
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, uri, self.format, arg_data)
except urllib_error.HTTPError as e: # Probably unnecessary, no dynamic url calls in the try block.
raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)
# this is a non-blocking read (ie, it will return if any data is available)
try:
if self.timeout:
Expand All @@ -57,13 +62,15 @@ def __iter__(self):
else:
yield {"timeout":True}
else:
self.buf += sock.recv(1024)
self.buf += sock.recv(1024) # As tweets are typically longer than 1KB, consider increasing this size.
except SSLError as e:
if (not self.block or self.timeout) and (e.errno == 2):
# Apparently this means there was nothing in the socket buf
pass
else:
raise
except urllib_error.HTTPError as e:
raise TwitterHTTPError(e, self.uri, 'json', self.arg_data)

def handle_stream_response(req, uri, arg_data, block, timeout=None):
handle = urllib_request.urlopen(req,)
Expand Down Expand Up @@ -119,4 +126,4 @@ def __init__(
TwitterStreamCall.__init__(
self, auth=auth, format="json", domain=domain,
callable_cls=call_cls,
secure=secure, uriparts=uriparts, timeout=timeout)
secure=secure, uriparts=uriparts, timeout=timeout, gzip=False)
41 changes: 30 additions & 11 deletions twitter/stream_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,39 @@

USAGE

twitter-stream-example <username> <password>
stream-example -t <token> -ts <token_secret> -ck <consumer_key> -cs <consumer_secret>

"""

from __future__ import print_function

import sys
import argparse

from .stream import TwitterStream
from .auth import UserPassAuth
from .util import printNicely
from twitter.stream import TwitterStream
from twitter.oauth import OAuth
from twitter.util import printNicely

def main(args=sys.argv[1:]):
if not args[1:]:
print(__doc__)
return 1

# When using twitter stream you must authorize. UserPass or OAuth.
stream = TwitterStream(auth=UserPassAuth(args[0], args[1]))
def parse_arguments():

parser = argparse.ArgumentParser()

parser.add_argument('-t', '--token', help='The Twitter Access Token.')
parser.add_argument('-ts', '--token_secret', help='The Twitter Access Token Secret.')
parser.add_argument('-ck', '--consumer_key', help='The Twitter Consumer Key.')
parser.add_argument('-cs', '--consumer_secret', help='The Twitter Consumer Secret.')

return parser.parse_args()

## parse_arguments()


def main():

args = parse_arguments()

# When using twitter stream you must authorize.
stream = TwitterStream(auth=OAuth(args.token, args.token_secret, args.consumer_key, args.consumer_secret))

# Iterate over the sample stream.
tweet_iter = stream.statuses.sample()
Expand All @@ -31,3 +45,8 @@ def main(args=sys.argv[1:]):
# or data message.
if tweet.get('text'):
printNicely(tweet['text'])

## main()

if __name__ == '__main__':
main()