lib_shim_message-shim_index.js

/*
 * Copyright 2020 New Relic Corporation. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 */

'use strict'

const genericRecorder = require('../../metrics/recorders/generic')
const logger = require('../../logger').child({ component: 'MessageShim' })
const TransactionShim = require('../transaction-shim')
const Shim = require('../shim') // For Shim.defineProperty
const util = require('util')
const { _nameMessageSegment } = require('./common')
const createRecorder = require('./consume')
const createSubscriberWrapper = require('./subscribe-consume')
const specs = require('../specs')

/**
 * Enumeration of well-known message brokers.
 *
 * @readonly
 * @memberof MessageShim
 * @enum {string}
 */
const LIBRARY_NAMES = {
  IRONMQ: 'IronMQ',
  KAFKA: 'Kafka',
  RABBITMQ: 'RabbitMQ',
  SNS: 'SNS',
  SQS: 'SQS'
}

/**
 * Mapping of well-known message brokers to their distributed tracing transport
 * type.
 *
 * @private
 * @readonly
 * @enum {string}
 */
const LIBRARY_TRANSPORT_TYPES = {
  AMQP: TransactionShim.TRANSPORT_TYPES.AMQP,
  IronMQ: TransactionShim.TRANSPORT_TYPES.IRONMQ,
  Kafka: TransactionShim.TRANSPORT_TYPES.KAFKA,
  RabbitMQ: TransactionShim.TRANSPORT_TYPES.AMQP
}

/**
 * Enumeration of possible message broker destination types.
 *
 * @readonly
 * @typedef {Object<string, string>} MessageShimTypes
 * @memberof MessageShim
 * @enum {string}
 */
const DESTINATION_TYPES = {
  EXCHANGE: 'Exchange',
  QUEUE: 'Queue',
  TOPIC: 'Topic'
}

/**
 * Constructs a shim specialized for instrumenting message brokers.
 *
 * @class
 * @augments TransactionShim
 * @classdesc Used for instrumenting message broker client libraries.
 * @param {Agent} agent The agent this shim will use.
 * @param {string} moduleName The name of the module being instrumented.
 * @param {string} resolvedName The full path to the loaded module.
 * @param {string} shimName Used to persist shim ids across different shim instances.
 * @param {string} pkgVersion version of module
 * @see Shim
 * @see TransactionShim
 */
function MessageShim(agent, moduleName, resolvedName, shimName, pkgVersion) {
  TransactionShim.call(this, agent, moduleName, resolvedName, shimName, pkgVersion)
  this._logger = logger.child({ module: moduleName })
  this._metrics = null
  this._transportType = TransactionShim.TRANSPORT_TYPES.UNKNOWN
}
module.exports = MessageShim
util.inherits(MessageShim, TransactionShim)

// Add constants on the shim for message broker libraries.
MessageShim.LIBRARY_NAMES = LIBRARY_NAMES
Object.keys(LIBRARY_NAMES).forEach(function defineLibraryEnum(libName) {
  Shim.defineProperty(MessageShim, libName, LIBRARY_NAMES[libName])
  Shim.defineProperty(MessageShim.prototype, libName, LIBRARY_NAMES[libName])
})

// Add constants to the shim for message broker destination types.
MessageShim.DESTINATION_TYPES = DESTINATION_TYPES
Object.keys(DESTINATION_TYPES).forEach(function defineTypesEnum(type) {
  Shim.defineProperty(MessageShim, type, DESTINATION_TYPES[type])
  Shim.defineProperty(MessageShim.prototype, type, DESTINATION_TYPES[type])
})

MessageShim.prototype.setLibrary = setLibrary
MessageShim.prototype.recordProduce = recordProduce
MessageShim.prototype.recordConsume = recordConsume
MessageShim.prototype.recordPurgeQueue = recordPurgeQueue
MessageShim.prototype.recordSubscribedConsume = recordSubscribedConsume

// -------------------------------------------------------------------------- //

/**
 * @callback MessageFunction
 * @summary
 *  Used for determining information about a message either being produced or
 *  consumed.
 * @param {MessageShim} shim
 *  The shim this function was handed to.
 * @param {Function} func
 *  The produce method or message consumer.
 * @param {string} name
 *  The name of the producer or consumer.
 * @param {Array.<*>} args
 *  The arguments being passed into the produce method or consumer.
 * @returns {specs.MessageSpec} The specification for the message being produced or
 *  consumed.
 * @see MessageShim#recordProduce
 * @see MessageShim#recordConsume
 */

/**
 * @callback MessageHandlerFunction
 * @summary
 *  A function that is used to extract properties from a consumed message. This
 *  method is handed the results of a consume call. If the consume used a
 *  callback, then this method will receive the arguments to the callback. If
 *  the consume used a promise, then this method will receive the resolved
 *  value.
 * @param {MessageShim} shim
 *  The shim this function was handed to.
 * @param {Function} func
 *  The produce method or message consumer.
 * @param {string} name
 *  The name of the producer or consumer.
 * @param {Array|*} args
 *  Either the arguments for the consumer callback function or the result of
 *  the resolved consume promise, depending on the mode of the instrumented
 *  method.
 * @returns {specs.MessageSpec} The extracted properties of the consumed message.
 * @see MessageShim#recordConsume
 */

/**
 * @callback MessageConsumerWrapperFunction
 * @summary
 *  Function that is used to wrap message consumer functions. Used alongside
 *  the MessageShim#recordSubscribedConsume API method.
 * @param {MessageShim} shim
 *  The shim this function was handed to.
 * @param {Function} consumer
 *  The message consumer to wrap.
 * @param {string} name
 *  The name of the consumer method.
 * @param {string} queue
 *  The name of the queue this consumer is being subscribed to.
 * @returns {Function} The consumer method, possibly wrapped.
 * @see MessageShim#recordSubscribedConsume
 * @see MessageShim#recordConsume
 */

// -------------------------------------------------------------------------- //

/**
 * Sets the vendor of the message broker being instrumented.
 *
 * This is used to generate the names for metrics and segments. If a string is
 * passed, metric names will be generated using that.
 *
 * @memberof MessageShim.prototype
 * @param {MessageShim.LIBRARY_NAMES|string} library
 *  The name of the message broker library. Use one of the well-known constants
 *  listed in {@link MessageShim.LIBRARY_NAMES} if available for the library.
 * @see MessageShim.LIBRARY_NAMES
 */
function setLibrary(library) {
  this._metrics = {
    PREFIX: 'MessageBroker/',
    LIBRARY: library,
    PRODUCE: 'Produce/',
    CONSUME: 'Consume/',
    PURGE: 'Purge/',
    NAMED: 'Named/',
    TEMP: 'Temp'
  }

  if (LIBRARY_TRANSPORT_TYPES[library]) {
    this._transportType = LIBRARY_TRANSPORT_TYPES[library]
  }

  this._logger = this._logger.child({ library })
  this.logger.trace({ metrics: this._metrics }, 'Library metric names set')
}

/**
 * Wraps the given properties as message producing methods to be recorded.
 *
 * - `recordProduce(nodule, properties, recordNamer)`
 * - `recordProduce(func, recordNamer)`
 *
 * The resulting wrapped methods will record their executions using the messaging
 * `PRODUCE` metric.
 *
 * @memberof MessageShim.prototype
 * @param {object | Function} nodule
 *  The source for the properties to wrap, or a single function to wrap.
 * @param {string|Array.<string>} [properties]
 *  One or more properties to wrap. If omitted, the `nodule` parameter is
 *  assumed to be the function to wrap.
 * @param {MessageFunction} recordNamer
 *  A function which specifies details of the message.
 * @returns {object | Function} The first parameter to this function, after
 *  wrapping it or its properties.
 * @see Shim#wrap
 * @see Shim#record
 * @see specs.MessageSpec
 * @see MessageFunction
 */
function recordProduce(nodule, properties, recordNamer) {
  if (this.isFunction(properties)) {
    // recordProduce(func, recordNamer)
    recordNamer = properties
    properties = null
  }

  return this.record(nodule, properties, function recordProd(shim) {
    const msgDesc = recordNamer.apply(this, arguments)
    if (!msgDesc) {
      return null
    }

    msgDesc.name = _nameMessageSegment(shim, msgDesc, shim._metrics.PRODUCE)
    if (!shim.agent.config.message_tracer.segment_parameters.enabled) {
      delete msgDesc.parameters
    } else if (msgDesc.routingKey) {
      msgDesc.parameters = shim.setDefaults(msgDesc.parameters, {
        routing_key: msgDesc.routingKey
      })
    }

    msgDesc.inContext = function generateCATHeaders() {
      if (msgDesc.headers) {
        shim.insertCATRequestHeaders(msgDesc.headers, true)
      }

      if (msgDesc.messageHeaders) {
        // Some message broker clients, like kafkajs, allow for sending multiple
        // messages in one send. Clients are likely to pick up such messages
        // individually. Thus, we need to propagate any distributed trace
        // headers to every message in the payload.
        msgDesc.messageHeaders((headers, altNames = true) => {
          shim.insertCATRequestHeaders(headers, altNames)
        })
      }
    }
    msgDesc.recorder = genericRecorder
    return msgDesc
  })
}
/**
 *
 * Wraps the given properties as message consumers to be recorded.
 *
 * - `recordConsume(nodule, properties, spec)`
 * - `recordConsume(func, spec)`
 *
 * The resulting wrapped methods will record their executions using the messaging
 * `CONSUME` metric, possibly also starting a message transaction. Note that
 * this should wrap the message _consumer_, to record methods which subscribe
 * consumers see {@link MessageShim#recordSubscribedConsume}
 *
 * @memberof MessageShim.prototype
 * @param {object | Function} nodule
 *  The source for the properties to wrap, or a single function to wrap.
 * @param {string|Array.<string>} [properties]
 *  One or more properties to wrap. If omitted, the `nodule` parameter is
 *  assumed to be the function to wrap.
 * @param {specs.MessageSpec|MessageFunction} spec
 *  The spec for the method or a function which returns the details of the
 *  method.
 * @returns {object | Function} The first parameter to this function, after
 *  wrapping it or its properties.
 * @see Shim#wrap
 * @see Shim#record
 * @see MessageShim#recordSubscribedConsume
 * @see specs.MessageSpec
 * @see MessageFunction
 */
function recordConsume(nodule, properties, spec) {
  if (this.isObject(properties) && !this.isArray(properties)) {
    // recordConsume(func, spec)
    spec = properties
    properties = null
  }

  return this.record(nodule, properties, function wrapConsume(shim, fn, fnName, args) {
    return createRecorder.call(this, { spec, shim, fn, fnName, args })
  })
}

/**
 * Wraps the given properties as queue purging methods.
 *
 * - `recordPurgeQueue(nodule, properties, spec)`
 * - `recordPurgeQueue(func, spec)`
 *
 * @memberof MessageShim.prototype
 * @param {object | Function} nodule
 *  The source for the properties to wrap, or a single function to wrap.
 * @param {string|Array.<string>} [properties]
 *  One or more properties to wrap. If omitted, the `nodule` parameter is
 *  assumed to be the function to wrap.
 * @param {specs.MessageSpec} spec
 *  The specification for this queue purge method's interface.
 * @param {string} spec.queue
 *  The name of the queue being purged.
 * @returns {object | Function} The first parameter to this function, after
 *  wrapping it or its properties.
 * @see Shim#wrap
 * @see Shim#record
 * @see RecorderSpec
 */
function recordPurgeQueue(nodule, properties, spec) {
  if (!nodule) {
    this.logger.debug('Not wrapping non-existent nodule.')
    return nodule
  }

  // Sort out the parameters.
  if (!this.isString(properties) && !this.isArray(properties)) {
    // recordPurgeQueue(nodule, spec)
    spec = properties
    properties = null
  }

  const specIsFunction = this.isFunction(spec)

  return this.record(nodule, properties, function purgeRecorder(shim, fn, name, args) {
    let descriptor = spec
    if (specIsFunction) {
      descriptor = spec.apply(this, arguments)
    }

    let queue = descriptor.queue
    if (shim.isNumber(queue)) {
      const queueIdx = shim.normalizeIndex(args.length, descriptor.queue)
      queue = args[queueIdx]
    }

    descriptor.name = _nameMessageSegment(
      shim,
      {
        destinationType: shim.QUEUE,
        destinationName: queue
      },
      shim._metrics.PURGE
    )
    descriptor.recorder = genericRecorder
    return descriptor
  })
}

/**
 * Wraps the given properties as message subscription methods.
 *
 * - `recordSubscribedConsume(nodule, properties, spec)`
 * - `recordSubscribedConsume(func, spec)`
 *
 * Message subscriber methods are ones used to register a message consumer with
 * the message library. See {@link MessageShim#recordConsume} for recording
 * the consumer itself.
 *
 * Note that unlike most `shim.recordX` methods, this method will call the
 * `spec.wrapper` method even if no transaction is active.
 *
 * @memberof MessageShim.prototype
 * @param {object | Function} nodule
 *  The source for the properties to wrap, or a single function to wrap.
 * @param {string|Array.<string>} [properties]
 *  One or more properties to wrap. If omitted, the `nodule` parameter is
 *  assumed to be the function to wrap.
 * @param {specs.MessageSubscribeSpec} spec
 *  The specification for this subscription method's interface.
 * @returns {object | Function} The first parameter to this function, after
 *  wrapping it or its properties.
 * @see Shim#wrap
 * @see Shim#record
 * @see MessageShim#recordConsume
 * @see specs.MessageSubscribeSpec
 */
function recordSubscribedConsume(nodule, properties, spec) {
  if (!nodule) {
    this.logger.debug('Not wrapping non-existent nodule.')
    return nodule
  }

  // Sort out the parameters.
  if (this.isObject(properties) && !this.isArray(properties)) {
    // recordSubscribedConsume(nodule, spec)
    spec = properties
    properties = null
  }

  // Must wrap the subscribe method independently to ensure that we can wrap
  // the consumer regardless of transaction state.
  const wrapped = this.wrap(nodule, properties, function wrapSubscribe(shim, fn, name) {
    if (!shim.isFunction(fn)) {
      return fn
    }

    return createSubscriberWrapper.call(this, { shim, fn, spec, name })
  })

  // Wrap the subscriber with segment creation.
  return this.record(wrapped, properties, function recordSubscribe(shim, fn, name, args) {
    if (shim.isFunction(spec)) {
      spec = spec.call(this, shim, fn, name, args)
    }
    // Make sure the specified consumer and callback indexes do not overlap.
    // This could happen for instance if the function signature is
    // `fn(consumer [, callback])` and specified as `consumer: shim.FIRST`,
    // `callback: shim.LAST`.
    const consumerIdx = shim.normalizeIndex(args.length, spec.consumer)
    let cbIdx = shim.normalizeIndex(args.length, spec.callback)
    if (cbIdx === consumerIdx) {
      cbIdx = null
    }

    return new specs.RecorderSpec({
      name: spec.name || name,
      callback: cbIdx,
      promise: spec.promise,
      parameters: spec.parameters,
      stream: false,
      internal: false
    })
  })
}