Source code for newrelic_telemetry_sdk.metric_batch

# Copyright 2019 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
import time


[docs]class MetricBatch(object): """Maps a metric identity to its aggregated value This is used to hold unfinalized metrics for further aggregation until they are flushed to a backend. :param tags: (optional) A dictionary of tags to attach to all flushes. :type tags: dict """ LOCK_CLS = threading.Lock def __init__(self, tags=None): self._interval_start = int(time.time() * 1000.0) self._lock = self.LOCK_CLS() self._batch = {} self._timestamps = {} tags = tags and dict(tags) self._common = {} if tags: self._common["attributes"] = tags
[docs] @staticmethod def create_identity(name, tags=None, typ=None): """Creates a deterministic hashable identity for a metric :param name: The name of the metric. :type name: str :param tags: (optional) A set of tags that can be used to filter this metric in the New Relic UI. :type tags: dict :param typ: (optional) The metric type. One of "summary", "count", "gauge" or None. Default: None (gauge type). :type typ: str """ if tags: tags = frozenset(tags.items()) return (typ, name, tags)
[docs] def record_gauge(self, name, value, tags=None): """Records a gauge metric :param name: The name of the metric. :type name: str :param value: The metric value. :type value: int or float :param tags: (optional) A set of tags that can be used to filter this metric in the New Relic UI. :type tags: dict """ identity = self.create_identity(name, tags) with self._lock: self._batch[identity] = value self._timestamps[identity] = int(time.time() * 1000.0)
[docs] def record_count(self, name, value, tags=None): """Records a count metric :param name: The name of the metric. :type name: str :param value: The metric value. :type value: int or float :param tags: (optional) A set of tags that can be used to filter this metric in the New Relic UI. :type tags: dict """ identity = self.create_identity(name, tags, "count") with self._lock: self._batch[identity] = self._batch.get(identity, 0) + value
[docs] def record_summary(self, name, value, tags=None): """Records a summary metric :param name: The name of the metric. :type name: str :param value: The metric value. :type value: int or float :param tags: (optional) A set of tags that can be used to filter this metric in the New Relic UI. :type tags: dict """ identity = self.create_identity(name, tags, "summary") with self._lock: if identity in self._batch: merged_value = self._batch[identity] merged_value["count"] += 1 merged_value["sum"] += value merged_value["min"] = min(value, merged_value["min"]) merged_value["max"] = max(value, merged_value["max"]) else: value = {"count": 1, "sum": value, "min": value, "max": value} self._batch[identity] = value
[docs] def flush(self): """Flush all metrics from the batch This method returns all metrics in the batch and a common block representing timestamp as the start time for the period since creation or last flush, and interval representing the total amount of time in milliseconds between flushes. As a side effect, the batch's interval is reset in anticipation of subsequent calls to flush. :returns: A tuple of (metrics, common) :rtype: tuple """ with self._lock: batch = self._batch timestamps = self._timestamps items = [] for identity, value in batch.items(): metric = {} typ, name, tags = identity metric["name"] = name if typ: metric["type"] = typ else: metric["timestamp"] = timestamps[identity] if tags: metric["attributes"] = dict(tags) metric["value"] = value items.append(metric) items = tuple(items) batch.clear() timestamps.clear() common = self._common.copy() common["timestamp"] = self._interval_start now = int(time.time() * 1000.0) interval = now - self._interval_start common["interval.ms"] = interval self._interval_start = now return items, common