Skip to content

Commit

Permalink
Add support to Fetcher for file: URLs.
Browse files Browse the repository at this point in the history
This also supports bare absolute path names.

Testing Done:
Locally green: `./pants test tests/python/pants_test/net/http`

CI went green here:
  https://travis-ci.org/pantsbuild/pants/builds/146249850

Bugs closed: 3324, 3697

Reviewed at https://rbcommons.com/s/twitter/r/4099/
  • Loading branch information
jsirois committed Jul 21, 2016
1 parent 4445752 commit 0450732
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ python_library(
dependencies=[
':node_resolver_base',
'3rdparty/python:six',
'src/python/pants/base:build_environment',
'src/python/pants/base:exceptions',
'src/python/pants/fs',
'src/python/pants/subsystem',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import shutil

import six.moves.urllib.parse as urllib_parse
from pants.base.build_environment import get_buildroot
from pants.base.exceptions import TaskError
from pants.fs.archive import archiver_for_path
from pants.net.http.fetcher import Fetcher
Expand Down Expand Up @@ -53,10 +54,10 @@ def resolve_target(self, node_task, target, results_dir, node_paths):
path=download_path))

try:
Fetcher().download(target.dependencies_archive_url,
listener=Fetcher.ProgressListener(),
path_or_fd=download_path,
timeout_secs=self.get_options().fetch_timeout_secs)
Fetcher(get_buildroot()).download(target.dependencies_archive_url,
listener=Fetcher.ProgressListener(),
path_or_fd=download_path,
timeout_secs=self.get_options().fetch_timeout_secs)
except Fetcher.Error as error:
raise TaskError('Failed to fetch preinstalled node_modules for {target} from {url}: {error}'
.format(target=target.address.reference(),
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/binaries/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ python_library(
sources=['binary_util.py'],
dependencies=[
'3rdparty/python/twitter/commons:twitter.common.collections',
'src/python/pants/base:build_environment',
'src/python/pants/base:exceptions',
'src/python/pants/net',
'src/python/pants/option',
Expand Down
3 changes: 2 additions & 1 deletion src/python/pants/binaries/binary_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from twitter.common.collections import OrderedSet

from pants.base.build_environment import get_buildroot
from pants.base.exceptions import TaskError
from pants.net.http.fetcher import Fetcher
from pants.subsystem.subsystem import Subsystem
Expand Down Expand Up @@ -159,7 +160,7 @@ def _select_binary_stream(self, name, binary_path, fetcher=None):
logger.info('Attempting to fetch {name} binary from: {url} ...'.format(name=name, url=url))
try:
with temporary_file() as dest:
fetcher = fetcher or Fetcher()
fetcher = fetcher or Fetcher(get_buildroot())
fetcher.download(url, listener=Fetcher.ProgressListener(), path_or_fd=dest)
logger.info('Fetched {name} binary from: {url} .'.format(name=name, url=url))
downloaded_successfully = True
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/ivy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ python_library(
dependencies = [
'3rdparty/python/twitter/commons:twitter.common.collections',
'3rdparty/python:six',
'src/python/pants/base:build_environment',
'src/python/pants/java/distribution',
'src/python/pants/java:executor',
'src/python/pants/java:util',
Expand Down
3 changes: 2 additions & 1 deletion src/python/pants/ivy/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import shutil

from pants.base.build_environment import get_buildroot
from pants.ivy.ivy import Ivy
from pants.ivy.ivy_subsystem import IvySubsystem
from pants.net.http.fetcher import Fetcher
Expand Down Expand Up @@ -139,7 +140,7 @@ def _bootstrap_ivy(self, bootstrap_jar_path):
options = self._ivy_subsystem.get_options()
if not os.path.exists(bootstrap_jar_path):
with temporary_file() as bootstrap_jar:
fetcher = Fetcher()
fetcher = Fetcher(get_buildroot())
checksummer = fetcher.ChecksumListener(digest=hashlib.sha1())
try:
logger.info('\nDownloading {}'.format(options.bootstrap_jar_url))
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/net/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ python_library(
'3rdparty/python:requests',
'3rdparty/python:six',
'src/python/pants/util:dirutil',
'src/python/pants/util:meta',
],
)
167 changes: 139 additions & 28 deletions src/python/pants/net/http/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import hashlib
import os
import re
import sys
import tempfile
import time
from abc import abstractmethod, abstractproperty
from contextlib import closing, contextmanager

import requests
import six

from pants.util.dirutil import safe_open
from pants.util.meta import AbstractClass


class Fetcher(object):
Expand Down Expand Up @@ -51,8 +54,6 @@ def response_code(self):
"""
return self._response_code

_TRANSIENT_EXCEPTION_TYPES = (requests.ConnectionError, requests.Timeout)

class Listener(object):
"""A listener callback interface for HTTP GET requests made by a Fetcher."""

Expand Down Expand Up @@ -191,14 +192,132 @@ def finished(self):
sys.stdout.write(' {:.3f}s\n'.format(time.time() - self._start))
sys.stdout.flush()

def __init__(self, requests_api=None):
def __init__(self, root_dir, requests_api=None):
"""Creates a Fetcher that uses the given requests api object.
By default uses the requests module, but can be any object conforming to the requests api like
a requests Session object.
:param root_dir: The root directory to find relative local `file://` url paths against.
:param requests_api: An optional requests api-like object.
"""
self._root_dir = root_dir
self._requests = requests_api or requests

class _Response(AbstractClass):
"""Abstracts a fetch response."""

@abstractproperty
def status_code(self):
"""The HTTP status code for the fetch.
:rtype: int
"""

@abstractproperty
def size(self):
"""The size of the fetched file in bytes if known; otherwise, `None`.
:rtype: int
:raises :class:`Fetcher.Error` if there is a problem determining the file size.
"""

@abstractmethod
def iter_content(self, chunk_size_bytes):
"""Return an iterator over the content of the fetched file's bytes.
:rtype: :class:`collections.Iterator` over byte chunks.
:raises :class:`Fetcher.Error` if there is a problem determining the file size.
"""

@abstractmethod
def close(self):
"""Close the underlying fetched file stream."""

class _RequestsResponse(_Response):
_TRANSIENT_EXCEPTION_TYPES = (requests.ConnectionError, requests.Timeout)

@classmethod
def as_fetcher_error(cls, url, e):
exception_factory = (Fetcher.TransientError if isinstance(e, cls._TRANSIENT_EXCEPTION_TYPES)
else Fetcher.PermanentError)
return exception_factory('Problem GETing data from {}: {}'.format(url, e))

def __init__(self, url, resp):
self._url = url
self._resp = resp

@property
def status_code(self):
return self._resp.status_code

@property
def size(self):
size = self._resp.headers.get('content-length')
return int(size) if size else None

def iter_content(self, chunk_size_bytes):
try:
return self._resp.iter_content(chunk_size=chunk_size_bytes)
except requests.RequestException as e:
raise self.as_fetcher_error(self._url, e)

def close(self):
self._resp.close()

class _LocalFileResponse(_Response):
def __init__(self, fp):
self._fp = fp

@property
def status_code(self):
return requests.codes.ok

@property
def size(self):
try:
stat = os.fstat(self._fp.fileno())
return stat.st_size
except OSError as e:
raise Fetcher.PermanentError('Problem stating {} for its size: {}'.format(self._fp.name, e))

def iter_content(self, chunk_size_bytes):
while True:
try:
data = self._fp.read(chunk_size_bytes)
except IOError as e:
raise Fetcher.PermanentError('Problem reading chunk from {}: {}'.format(self._fp.name, e))
if not data:
break
yield data

def close(self):
self._fp.close()

def _as_local_file_path(self, url):
path = re.sub(r'^//', '', url.lstrip('file:'))
if path.startswith('/'):
return path
elif url.startswith('file:'):
return os.path.join(self._root_dir, path)
else:
return None

def _fetch(self, url, timeout_secs=None):
path = self._as_local_file_path(url)
if path:
try:
fp = open(path, 'rb')
return self._LocalFileResponse(fp)
except IOError as e:
raise self.PermanentError('Problem reading data from {}: {}'.format(path, e))
else:
try:
resp = self._requests.get(url, stream=True, timeout=timeout_secs, allow_redirects=True)
return self._RequestsResponse(url, resp)
except requests.RequestException as e:
raise self._RequestsResponse.as_fetcher_error(url, e)

def fetch(self, url, listener, chunk_size_bytes=None, timeout_secs=None):
"""Fetches data from the given URL notifying listener of all lifecycle events.
Expand All @@ -208,35 +327,27 @@ def fetch(self, url, listener, chunk_size_bytes=None, timeout_secs=None):
:param timeout_secs: the maximum time to wait for data to be available, 1 second by default
:raises: Fetcher.Error if there was a problem fetching all data from the given url
"""
chunk_size_bytes = chunk_size_bytes or 10 * 1024
timeout_secs = timeout_secs or 1.0

if not isinstance(listener, self.Listener):
raise ValueError('listener must be a Listener instance, given {}'.format(listener))

try:
with closing(self._requests.get(url, stream=True, timeout=timeout_secs,
allow_redirects=True)) as resp:
if resp.status_code != requests.codes.ok:
listener.status(resp.status_code)
raise self.PermanentError('GET request to {} failed with status code {}'
.format(url, resp.status_code),
response_code=resp.status_code)

size = resp.headers.get('content-length')
listener.status(resp.status_code, content_length=int(size) if size else None)
chunk_size_bytes = chunk_size_bytes or 10 * 1024
timeout_secs = timeout_secs or 1.0

read_bytes = 0
for data in resp.iter_content(chunk_size=chunk_size_bytes):
listener.recv_chunk(data)
read_bytes += len(data)
if size and read_bytes != int(size):
raise self.Error('Expected {} bytes, read {}'.format(size, read_bytes))
listener.finished()
except requests.RequestException as e:
exception_factory = (self.TransientError if isinstance(e, self._TRANSIENT_EXCEPTION_TYPES)
else self.PermanentError)
raise exception_factory('Problem GETing data from {}: {}'.format(url, e))
with closing(self._fetch(url, timeout_secs=timeout_secs)) as resp:
if resp.status_code != requests.codes.ok:
listener.status(resp.status_code)
raise self.PermanentError('Fetch of {} failed with status code {}'
.format(url, resp.status_code),
response_code=resp.status_code)
listener.status(resp.status_code, content_length=resp.size)

read_bytes = 0
for data in resp.iter_content(chunk_size_bytes=chunk_size_bytes):
listener.recv_chunk(data)
read_bytes += len(data)
if resp.size and read_bytes != resp.size:
raise self.Error('Expected {} bytes, read {}'.format(resp.size, read_bytes))
listener.finished()

def download(self, url, listener=None, path_or_fd=None, chunk_size_bytes=None, timeout_secs=None):
"""Downloads data from the given URL.
Expand Down
1 change: 1 addition & 0 deletions tests/python/pants_test/net/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ python_tests(
'3rdparty/python:six',
'src/python/pants/net',
'src/python/pants/util:contextutil',
'src/python/pants/util:dirutil',
]
)
Loading

0 comments on commit 0450732

Please sign in to comment.