Source code for newrelic_telemetry_sdk.client

# Copyright 2019 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import uuid
import zlib

import urllib3
from urllib3.util import parse_url

try:
    from urllib.request import getproxies
except ImportError:
    from urllib import getproxies


_logger = logging.getLogger(__name__)

try:
    from newrelic_telemetry_sdk.version import version as __version__
except ImportError:  # pragma: no cover
    __version__ = "unknown"

USER_AGENT = "NewRelic-Python-TelemetrySDK/{}".format(__version__)

__all__ = ("SpanClient", "MetricClient", "EventClient", "HTTPError", "HTTPResponse")


[docs] class HTTPError(ValueError): """Unexpected HTTP Status"""
[docs] class HTTPResponse(urllib3.HTTPResponse): """A wrapper for urllib3.HTTPResponse, providing additional helper methods""" def __init__(self, response): """Initialize the wrapper with an urllib3.HTTPResponse object""" self._response = response def __getattr__(self, name): """Expose attributes and methods of the original urllib3.HTTPResponse object""" return getattr(self._response, name)
[docs] def json(self): """Returns the json-encoded content of a response. :rtype: dict """ return json.loads(self.data.decode("utf-8"))
@property def ok(self): """Return true if status code indicates success""" return 200 <= self.status < 300
[docs] def raise_for_status(self): """Raise an exception for an unsuccessful HTTP status code :raises HTTPError: if response status is not successful """ if not self.ok: raise HTTPError(self.status, self)
# No longer a subclass, kept for backwards compatibility HTTPSConnectionPool = urllib3.HTTPSConnectionPool class Client(object): """HTTP Client for interacting with New Relic APIs This class is used to send data to the New Relic APIs over HTTP. This class will automatically handle retries as needed. :param license_key: New Relic license key :type license_key: str :param host: (optional) Override the host for the client. :type host: str :param port: (optional) Override the port for the client. Default: 443 :type port: int Usage:: >>> import os >>> license_key = os.environ.get("NEW_RELIC_LICENSE_KEY", "") >>> client = Client(license_key, host="metric-api.newrelic.com") >>> response = client.send({}) >>> client.close() """ POOL_CLS = HTTPSConnectionPool PAYLOAD_TYPE = "" HOST = "" PATH = "/" HEADERS = urllib3.make_headers( keep_alive=True, accept_encoding=True, user_agent=USER_AGENT ) def __init__(self, license_key, host=None, port=443): host = host or self.HOST headers = self.HEADERS.copy() headers.update( { "Api-Key": license_key, "Content-Encoding": "gzip", "Content-Type": "application/json", } ) retries = urllib3.Retry( total=False, connect=None, read=None, redirect=0, status=None ) # Check if https traffic should be proxied and pass the proxy # information to the connectionpool proxies = getproxies() proxy = proxies.get("https", None) proxy_headers = None if proxy: proxy = parse_url(proxy) _logger.info( "Using proxy host={0!r} port={1!r}".format(proxy.host, proxy.port) ) if proxy.scheme.lower() != "http": _logger.warning( "Contacting https destinations through " "{} proxies is not supported.".format(proxy.scheme) ) proxy = None elif proxy.auth: # https://tools.ietf.org/html/rfc7617 # # The username/password encoding is not specified by a standard. # "this specification continues to leave the default encoding undefined" # # parse_url will encode non-ascii characters into a # percent-encoded string. As a result, we make the assumption # that anything returned from parse_url is utf-8 encoded. # # This is, of course, not guaranteed to be interpreted # correctly by the proxy server, but the failure mode will # hopefully be interpreted as an incorrect username/password # combination rather than causing a security issue where # information may be leaked (control characters, etc.) proxy_headers = urllib3.make_headers(proxy_basic_auth=proxy.auth) self._pool = self.POOL_CLS( host=host, port=port, retries=retries, headers=headers, _proxy=proxy, _proxy_headers=proxy_headers, ) self._headers = self._pool.headers def add_version_info(self, product, product_version): """Adds product and version information to a User-Agent header This method implements https://tools.ietf.org/html/rfc7231#section-5.5.3 :param product: The product name using the SDK :type product: str :param product_version: The version string of the product in use :type product_version: str """ product_ua_header = " {}/{}".format(product, product_version) self._headers["user-agent"] += product_ua_header def close(self): """Close all open connections and disable internal connection pool.""" self._pool.close() @staticmethod def _compress_payload(payload): level = zlib.Z_DEFAULT_COMPRESSION compressor = zlib.compressobj(level, zlib.DEFLATED, 31) payload = compressor.compress(payload) payload += compressor.flush() return payload def _create_payload(self, items, common): payload = {self.PAYLOAD_TYPE: items} if common: payload["common"] = common payload = json.dumps([payload], separators=(",", ":")) if not isinstance(payload, bytes): payload = payload.encode("utf-8") return self._compress_payload(payload) def send(self, item, timeout=None): """Send a single item :param item: The object to send :type item: dict :param timeout: (optional) a timeout in seconds for sending the request :type timeout: int :rtype: HTTPResponse """ return self.send_batch((item,), timeout=timeout) def send_batch(self, items, common=None, timeout=None): """Send a batch of items :param items: An iterable of items to send to New Relic. :type items: list or tuple :param common: (optional) A map of attributes that will be set on each item. :type common: dict :param timeout: (optional) a timeout in seconds for sending the request :type timeout: int :rtype: HTTPResponse """ # Specifying the headers argument overrides any base headers existing # in the pool, so we must copy all existing headers headers = self._headers.copy() # Generate a unique request ID for this request headers["x-request-id"] = str(uuid.uuid4()) payload = self._create_payload(items, common) urllib3_response = self._pool.urlopen( "POST", self.PATH, body=payload, headers=headers, timeout=timeout ) if not isinstance(urllib3_response, urllib3.HTTPResponse): raise ValueError( "Expected urllib3.HTTPResponse, got {}".format(type(urllib3_response)) ) return HTTPResponse(urllib3_response)
[docs] class SpanClient(Client): """HTTP Client for interacting with the New Relic Span API This class is used to send spans to the New Relic Span API over HTTP. :param license_key: New Relic license key :type license_key: str :param host: (optional) Override the host for the span API endpoint. :type host: str :param port: (optional) Override the port for the client. Default: 443 :type port: int Usage:: >>> import os >>> license_key = os.environ.get("NEW_RELIC_LICENSE_KEY", "") >>> span_client = SpanClient(license_key) >>> response = span_client.send({}) >>> span_client.close() """ HOST = "trace-api.newrelic.com" PATH = "/trace/v1" PAYLOAD_TYPE = "spans"
[docs] class MetricClient(Client): """HTTP Client for interacting with the New Relic Metric API This class is used to send metrics to the New Relic Metric API over HTTP. :param license_key: New Relic license key :type license_key: str :param host: (optional) Override the host for the metric API endpoint. :type host: str :param port: (optional) Override the port for the client. Default: 443 :type port: int Usage:: >>> import os >>> license_key = os.environ.get("NEW_RELIC_LICENSE_KEY", "") >>> metric_client = MetricClient(license_key) >>> response = metric_client.send({}) >>> metric_client.close() """ HOST = "metric-api.newrelic.com" PATH = "/metric/v1" PAYLOAD_TYPE = "metrics"
[docs] class EventClient(Client): """HTTP Client for interacting with the New Relic Event API This class is used to send events to the New Relic Event API over HTTP. :param license_key: New Relic license key :type license_key: str :param host: (optional) Override the host for the event API endpoint. :type host: str :param port: (optional) Override the port for the client. Default: 443 :type port: int Usage:: >>> import os >>> license_key = os.environ.get("NEW_RELIC_LICENSE_KEY", "") >>> event_client = EventClient(license_key) >>> response = event_client.send({}) >>> event_client.close() """ HOST = "insights-collector.newrelic.com" PATH = "/v1/accounts/events" def _create_payload(self, items, common): payload = json.dumps(items) if not isinstance(payload, bytes): payload = payload.encode("utf-8") return self._compress_payload(payload)
[docs] def send_batch(self, items, timeout=None): """Send a batch of items :param items: An iterable of items to send to New Relic. :type items: list or tuple :param timeout: (optional) a timeout in seconds for sending the request :type timeout: int :rtype: HTTPResponse """ return super(EventClient, self).send_batch(items, None, timeout=timeout)
class LogClient(Client): """HTTP Client for interacting with the New Relic Log API This class is used to send log messages to the New Relic Log API over HTTP. :param license_key: New Relic license key :type license_key: str :param host: (optional) Override the host for the metric API endpoint. :type host: str :param port: (optional) Override the port for the client. Default: 443 :type port: int Usage:: >>> import os >>> license_key = os.environ.get("NEW_RELIC_LICENSE_KEY", "") >>> log_client = LogClient(license_key) >>> response = log_client.send({}) >>> log_client.close() """ HOST = "log-api.newrelic.com" PATH = "/log/v1" PAYLOAD_TYPE = "logs"