Skip to content

Commit

Permalink
cats effect 3: pass transaction info by implicits
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-gibson committed Dec 1, 2021
1 parent 0da09b0 commit 4fe7fee
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void attemptExpireTokenRefCount(AgentBridge.TokenAndRefCount token
}

public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenAndRefCount) {
if (tokenAndRefCount != null) {
if (tokenAndRefCount != null && tokenAndRefCount.token != null) {
logTokenInfo(tokenAndRefCount, "setting token to thread");
AgentBridge.activeToken.set(tokenAndRefCount);
tokenAndRefCount.token.link();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.newrelic.cats3.api

import cats.effect.Sync
import com.newrelic.api.agent.NewRelic
import com.newrelic.api.agent.weaver.Weaver
import com.newrelic.api.agent.weaver.scala.{ScalaMatchType, ScalaWeave}

@ScalaWeave(`type` = ScalaMatchType.Object, `originalName` = "com.newrelic.cats3.api.TraceOps")
class TraceOps_Instrumentation {
def txn[S, F[_]:Sync](body: F[S]): F[S] = {
Util.wrapTrace(Weaver.callOriginal)
// def txnInfo: TxnInfo = Weaver.callOriginal()


def txn[S, F[_]:Sync](body: TxnInfo => F[S]): F[S] = {
Util.wrapTrace(body)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,46 @@ package com.newrelic.cats3.api

import cats.effect.Sync
import cats.implicits._
import com.newrelic.agent.bridge.{AgentBridge, ExitTracer, Transaction, TracedMethod}
import com.newrelic.agent.bridge.{AgentBridge, ExitTracer, Token, Transaction}
import com.newrelic.api.agent.NewRelic

import java.util.concurrent.atomic.AtomicInteger

object Util {
val RETURN_OPCODE = 176

def wrapTrace[S, F[_] : Sync](body: F[S]): F[S] =
Sync[F].delay(AgentBridge.instrumentation.createScalaTxnTracer())
.redeemWith(_ => body,
tracer =>
if (tracer == null) {
body
} else {
for {
txnWithTracedMethod <- Sync[F].delay {
val agent = AgentBridge.getAgent
(agent.getTransaction(false), agent.getTracedMethod)
}
_ <- setupTokenAndRefCount(txnWithTracedMethod)
res <- attachErrorEvent(body, tracer)
_ <- cleanupTxnAndTokenRefCount(txnWithTracedMethod._1)
_ <- Sync[F].delay(tracer.finish(RETURN_OPCODE, null))
} yield res
}
)
def wrapTrace[S, F[_] : Sync](body: TxnInfo => F[S]): F[S] =
Sync[F].delay {
val tracer = AgentBridge.instrumentation.createScalaTxnTracer()
(tracer, optTxnInfo())
}.redeemWith(
_ => body(txnInfo),
tracerAndTxnInfo => {
val (tracer, optTxnInfo) = tracerAndTxnInfo
for {
_ <- Sync[F].delay(AgentBridge.getAgent.getTransaction(false))
updatedTxnInfo <- setupTokenAndRefCount(optTxnInfo, txnInfo)
res <- attachErrorEvent(body(updatedTxnInfo), tracer)
_ <- cleanupTxnAndTokenRefCount(updatedTxnInfo)
_ <- Sync[F].delay(tracer.finish(RETURN_OPCODE, null))
} yield res
}
)

private def txnInfo = {
val txn = NewRelic.getAgent.getTransaction
TxnInfo(txn, txn.getToken)
}

def optTxnInfo(): (Transaction, Token) = {
val txn = AgentBridge.getAgent.getTransaction(false)
if (txn != null) {
(txn, txn.getToken)
} else {
null
}
}


private def attachErrorEvent[S, F[_] : Sync](body: F[S], tracer: ExitTracer): F[S] =
body
Expand All @@ -36,23 +50,29 @@ object Util {
Sync[F].raiseError(throwable)
})

private def setupTokenAndRefCount[F[_] : Sync](txnWithTracerMethod: (Transaction, TracedMethod)): F[Unit] = Sync[F].delay {

val (txn, tracedMethod) = txnWithTracerMethod
if (txn != null && tracedMethod != null) {
AgentBridge.activeToken.set(
new AgentBridge.TokenAndRefCount(
txn.getToken,
tracedMethod,
new AtomicInteger(0)
))
private def setupTokenAndRefCount[F[_] : Sync](optTxn: (Transaction, Token), fallback: => TxnInfo)
: F[TxnInfo] =
Sync[F].delay {
if (optTxn != null) {
val (txn, token) = optTxn
AgentBridge.activeToken.set(
new AgentBridge.TokenAndRefCount(
token,
AgentBridge.getAgent.getTracedMethod,
new AtomicInteger(0)
)
)
TxnInfo(txn, token)
} else {
fallback
}
}
}

private def cleanupTxnAndTokenRefCount[F[_] : Sync](txn: Transaction): F[Unit] = Sync[F].delay {
AgentBridge.activeToken.remove()
if (txn != null) {
txn.expireAllTokens()
private def cleanupTxnAndTokenRefCount[F[_] : Sync](txnInfo: TxnInfo): F[Unit] = Sync[F].delay {
val tokenAndRefCount = AgentBridge.activeToken.get()
if (tokenAndRefCount != null) {
AgentBridge.activeToken.remove()
}
txnInfo.transaction.asInstanceOf[Transaction].expireAllTokens()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,40 @@ package com.newrelic.cats3.api

import cats.effect.Sync
import cats.implicits._
import com.newrelic.api.agent.{NewRelic, Segment, Token}

import com.newrelic.api.agent.{NewRelic, Segment, Token, Transaction}

case class TxnInfo(transaction: Transaction, token: Token)
object TraceOps {

private def txnInfo: TxnInfo = {
val txn = NewRelic.getAgent.getTransaction
TxnInfo(txn, txn.getToken)
}
/**
* Creates a segment to capture metrics for a given block of code, this will call {@link com.newrelic.api.agent.Transaction# startSegment ( String )},
* execute the code block, then call {@link com.newrelic.api.agent.Segment# end ( )}. This {@link Segment} will show up in the Transaction Breakdown
* table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. The code below will produce 1 segment <i>trace segment name</i>
* <pre>
* trace("trace segment name") {
* val i = 1
* val j = 2
* i + j
* }
* txn { implicit txnInfo =>
* trace("trace segment name") {
* val i = 1
* val j = 2
* i + j
* }
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
* This name will show up in the Transaction Breakdown table, as well as the Transaction Trace page.
* <p>
* if null or an empty String, the agent will report "Unnamed Segment".
* @param block Code block segment is to capture metrics for
* @param txnInfo implicit Transaction and Token segment is attached to
* @tparam S Type returned from executed code block
* @return Value returned by executed code block
*/
def trace[S](segmentName: String)(block: => S): S = {
val txn = NewRelic.getAgent.getTransaction()
val segment = txn.startSegment(segmentName)
def trace[S](segmentName: String)(block: => S)(implicit txnInfo: TxnInfo): S = {
val segment = txnInfo.transaction.startSegment(segmentName)
try {
block
} finally {
Expand All @@ -46,10 +51,12 @@ object TraceOps {
* This {@link Segment} will show up in the Transaction Breakdown table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. The code below will produce 2 segments <i>trace segment 1</i> and <i>trace segment 2</i>
* <pre>
* for {
* i <- asyncTrace("trace segment 1")(IO(1))
* j <- asyncTrace("trace segment 2")(IO(i + 1))
* } yield j
* txn { implicit txnInfo =>
* for {
* i <- asyncTrace("trace segment 1")(IO(1))
* j <- asyncTrace("trace segment 2")(IO(i + 1))
* } yield j
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
Expand All @@ -58,10 +65,11 @@ object TraceOps {
* if null or an empty String, the agent will report "Unnamed Segment".
* @param block F[S] value the segment is to capture metrics for.
* The block should return a { @link IO}
* @param txnInfo implicit Transaction and Token segment is attached to
* @return Value returned from completed asynchronous code block
*/
def asyncTrace[S, F[_] : Sync](segmentName: String)(block: F[S]): F[S] = for {
segment <- startSegment(segmentName)
def asyncTrace[S, F[_] : Sync](segmentName: String)(block: F[S])(implicit txnInfo: TxnInfo): F[S] = for {
segment <- Sync[F].delay(txnInfo.transaction.startSegment(segmentName))
res <- endSegmentOnError(block, segment)
_ <- Sync[F].delay(segment.end())
} yield res
Expand All @@ -73,24 +81,26 @@ object TraceOps {
* table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. the code below will produce a segment <i>trace map segment</i>
* <pre>
* IO(1)
* .map(traceFun("trace map segment")(i => i + 1))
* .filter(traceFun("trace filter segment")(i => i % 2 == 0))
* txn { implicit txnInfo =>
* IO(1)
* .map(traceFun("trace map 1 segment")(i => i + 1))
* .map(traceFun("trace map 2 segment")(i => i + 2))
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
* This name will show up in the Transaction Breakdown table, as well as the Transaction Trace page.
* <p>
* if null or an empty String, the agent will report "Unnamed Segment".
* @param f Function segment is to capture metrics for.
* @param txnInfo implicit Transaction and Token segment is attached to
* @tparam T Input type for function segment is to capture metrics for.
* @tparam S Type returned from executed function
* @return Value returned from executed function
*/
def traceFun[T, S](segmentName: String)(f: T => S): T => S = {
def traceFun[T, S](segmentName: String)(f: T => S)(implicit txnInfo: TxnInfo): T => S = {
t: T =>
val txn = NewRelic.getAgent.getTransaction()
val segment = txn.startSegment(segmentName)
val segment = txnInfo.transaction.startSegment(segmentName)
try {
f(t)
} finally {
Expand All @@ -106,21 +116,24 @@ object TraceOps {
* This {@link Segment} will show up in the Transaction Breakdown table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. The code below will produce 1 segment <i>trace flatMap segment</i>
* <pre>
* IO(1).flatMap(asyncTraceFun("trace flatMap segment")(i => IO(i + 1)))
* txn { implicit txnInfo =>
* IO(1).flatMap(asyncTraceFun("trace flatMap segment")(i => IO(i + 1)))
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
* This name will show up in the Transaction Breakdown table, as well as the Transaction Trace page.
* <p>
* if null or an empty String, the agent will report "Unnamed Segment".
* @param f Asynchronous function segment is to capture metrics for.
* @param txnInfo implicit Transaction and Token segment is attached to
* @tparam T Input type for function segment is to capture metrics for.
* @tparam S Type returned from completed asynchronous function
* @return Value returned from completed asynchronous function
*/
def asyncTraceFun[T, S, F[_] : Sync](segmentName: String)(f: T => F[S]): T => F[S] = { t: T =>
def asyncTraceFun[T, S, F[_] : Sync](segmentName: String)(f: T => F[S])(implicit txnInfo: TxnInfo): T => F[S] = { t: T =>
for {
segment <- startSegment(segmentName)
segment <- Sync[F].delay(txnInfo.transaction.startSegment(segmentName))
evaluatedFunc <- endSegmentOnError(f(t), segment)
_ <- Sync[F].delay(segment.end())
} yield evaluatedFunc
Expand All @@ -133,23 +146,18 @@ object TraceOps {
* The newly created {@link com.newrelic.api.agent.Transaction} will complete once the code block has been executed
* e.g. the code below will create a Transaction and with 2 segments <i>trace map IO</i> and <i>trace filter IO</i>
* <pre>
* txn {
* IO(1).map(traceFun("trace map IO")(i => i + 1))
* }
* txn { implicit txnInfo =>
* IO(1).map(traceFun("trace map IO")(i => i + 1))
* }
* </pre>
*
* @param block Code block to be executed inside a transaction
* @tparam S Type returned by code block
* @return Value returned by code block
*/
def txn[S, F[_] : Sync](body: F[S]): F[S] = body

private def startSegment[F[_] : Sync](segmentName: String): F[Segment] = Sync[F].delay {
val txn = NewRelic.getAgent.getTransaction()
txn.startSegment(segmentName)
}
def txn[S, F[_] : Sync](body: TxnInfo => F[S]): F[S] = body(txnInfo)

private def endSegmentOnError[S, F[_] : Sync](sync: F[S], segment: Segment) =
private def endSegmentOnError[S, F[_] : Sync](sync: F[S], segment: Segment): F[S] =
sync.handleErrorWith(t => {
segment.end()
Sync[F].raiseError(t)
Expand Down
Loading

0 comments on commit 4fe7fee

Please sign in to comment.