Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cats effect 3: pass transaction info by implicits #578

Merged
merged 1 commit into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void attemptExpireTokenRefCount(AgentBridge.TokenAndRefCount token
}

public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenAndRefCount) {
if (tokenAndRefCount != null) {
if (tokenAndRefCount != null && tokenAndRefCount.token != null) {
logTokenInfo(tokenAndRefCount, "setting token to thread");
AgentBridge.activeToken.set(tokenAndRefCount);
tokenAndRefCount.token.link();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.newrelic.cats3.api

import cats.effect.Sync
import com.newrelic.api.agent.NewRelic
import com.newrelic.api.agent.weaver.Weaver
import com.newrelic.api.agent.weaver.scala.{ScalaMatchType, ScalaWeave}

@ScalaWeave(`type` = ScalaMatchType.Object, `originalName` = "com.newrelic.cats3.api.TraceOps")
class TraceOps_Instrumentation {
def txn[S, F[_]:Sync](body: F[S]): F[S] = {
Util.wrapTrace(Weaver.callOriginal)
def txn[S, F[_]:Sync](body: TxnInfo => F[S]): F[S] = {
Util.wrapTrace(body)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,46 @@ package com.newrelic.cats3.api

import cats.effect.Sync
import cats.implicits._
import com.newrelic.agent.bridge.{AgentBridge, ExitTracer, Transaction, TracedMethod}
import com.newrelic.agent.bridge.{AgentBridge, ExitTracer, Token, Transaction}
import com.newrelic.api.agent.NewRelic

import java.util.concurrent.atomic.AtomicInteger

object Util {
val RETURN_OPCODE = 176

def wrapTrace[S, F[_] : Sync](body: F[S]): F[S] =
Sync[F].delay(AgentBridge.instrumentation.createScalaTxnTracer())
.redeemWith(_ => body,
tracer =>
if (tracer == null) {
body
} else {
for {
txnWithTracedMethod <- Sync[F].delay {
val agent = AgentBridge.getAgent
(agent.getTransaction(false), agent.getTracedMethod)
}
_ <- setupTokenAndRefCount(txnWithTracedMethod)
res <- attachErrorEvent(body, tracer)
_ <- cleanupTxnAndTokenRefCount(txnWithTracedMethod._1)
_ <- Sync[F].delay(tracer.finish(RETURN_OPCODE, null))
} yield res
}
)
def wrapTrace[S, F[_] : Sync](body: TxnInfo => F[S]): F[S] =
Sync[F].delay {
val tracer = AgentBridge.instrumentation.createScalaTxnTracer()
(tracer, optTxnInfo())
}.redeemWith(
_ => body(txnInfo),
tracerAndTxnInfo => {
val (tracer, optTxnInfo) = tracerAndTxnInfo
for {
_ <- Sync[F].delay(AgentBridge.getAgent.getTransaction(false))
updatedTxnInfo <- setupTokenAndRefCount(optTxnInfo, txnInfo)
res <- attachErrorEvent(body(updatedTxnInfo), tracer)
_ <- cleanupTxnAndTokenRefCount(updatedTxnInfo)
_ <- Sync[F].delay(tracer.finish(RETURN_OPCODE, null))
} yield res
}
)

private def txnInfo = {
val txn = NewRelic.getAgent.getTransaction
TxnInfo(txn, txn.getToken)
}

def optTxnInfo(): (Transaction, Token) = {
val txn = AgentBridge.getAgent.getTransaction(false)
if (txn != null) {
(txn, txn.getToken)
} else {
null
}
}


private def attachErrorEvent[S, F[_] : Sync](body: F[S], tracer: ExitTracer): F[S] =
body
Expand All @@ -36,23 +50,29 @@ object Util {
Sync[F].raiseError(throwable)
})

private def setupTokenAndRefCount[F[_] : Sync](txnWithTracerMethod: (Transaction, TracedMethod)): F[Unit] = Sync[F].delay {

val (txn, tracedMethod) = txnWithTracerMethod
if (txn != null && tracedMethod != null) {
AgentBridge.activeToken.set(
new AgentBridge.TokenAndRefCount(
txn.getToken,
tracedMethod,
new AtomicInteger(0)
))
private def setupTokenAndRefCount[F[_] : Sync](optTxn: (Transaction, Token), fallback: => TxnInfo)
: F[TxnInfo] =
Sync[F].delay {
if (optTxn != null) {
val (txn, token) = optTxn
AgentBridge.activeToken.set(
new AgentBridge.TokenAndRefCount(
token,
AgentBridge.getAgent.getTracedMethod,
new AtomicInteger(0)
)
)
TxnInfo(txn, token)
} else {
fallback
}
}
}

private def cleanupTxnAndTokenRefCount[F[_] : Sync](txn: Transaction): F[Unit] = Sync[F].delay {
AgentBridge.activeToken.remove()
if (txn != null) {
txn.expireAllTokens()
private def cleanupTxnAndTokenRefCount[F[_] : Sync](txnInfo: TxnInfo): F[Unit] = Sync[F].delay {
val tokenAndRefCount = AgentBridge.activeToken.get()
if (tokenAndRefCount != null) {
AgentBridge.activeToken.remove()
}
txnInfo.transaction.asInstanceOf[Transaction].expireAllTokens()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,40 @@ package com.newrelic.cats3.api

import cats.effect.Sync
import cats.implicits._
import com.newrelic.api.agent.{NewRelic, Segment, Token}

import com.newrelic.api.agent.{NewRelic, Segment, Token, Transaction}

case class TxnInfo(transaction: Transaction, token: Token)
object TraceOps {

private def txnInfo: TxnInfo = {
val txn = NewRelic.getAgent.getTransaction
TxnInfo(txn, txn.getToken)
}
/**
* Creates a segment to capture metrics for a given block of code, this will call {@link com.newrelic.api.agent.Transaction# startSegment ( String )},
* execute the code block, then call {@link com.newrelic.api.agent.Segment# end ( )}. This {@link Segment} will show up in the Transaction Breakdown
* table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. The code below will produce 1 segment <i>trace segment name</i>
* <pre>
* trace("trace segment name") {
* val i = 1
* val j = 2
* i + j
* }
* txn { implicit txnInfo =>
* trace("trace segment name") {
* val i = 1
* val j = 2
* i + j
* }
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
* This name will show up in the Transaction Breakdown table, as well as the Transaction Trace page.
* <p>
* if null or an empty String, the agent will report "Unnamed Segment".
* @param block Code block segment is to capture metrics for
* @param txnInfo implicit Transaction and Token segment is attached to
* @tparam S Type returned from executed code block
* @return Value returned by executed code block
*/
def trace[S](segmentName: String)(block: => S): S = {
val txn = NewRelic.getAgent.getTransaction()
val segment = txn.startSegment(segmentName)
def trace[S](segmentName: String)(block: => S)(implicit txnInfo: TxnInfo): S = {
val segment = txnInfo.transaction.startSegment(segmentName)
try {
block
} finally {
Expand All @@ -46,10 +51,12 @@ object TraceOps {
* This {@link Segment} will show up in the Transaction Breakdown table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. The code below will produce 2 segments <i>trace segment 1</i> and <i>trace segment 2</i>
* <pre>
* for {
* i <- asyncTrace("trace segment 1")(IO(1))
* j <- asyncTrace("trace segment 2")(IO(i + 1))
* } yield j
* txn { implicit txnInfo =>
* for {
* i <- asyncTrace("trace segment 1")(IO(1))
* j <- asyncTrace("trace segment 2")(IO(i + 1))
* } yield j
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
Expand All @@ -58,10 +65,11 @@ object TraceOps {
* if null or an empty String, the agent will report "Unnamed Segment".
* @param block F[S] value the segment is to capture metrics for.
* The block should return a { @link IO}
* @param txnInfo implicit Transaction and Token segment is attached to
* @return Value returned from completed asynchronous code block
*/
def asyncTrace[S, F[_] : Sync](segmentName: String)(block: F[S]): F[S] = for {
segment <- startSegment(segmentName)
def asyncTrace[S, F[_] : Sync](segmentName: String)(block: F[S])(implicit txnInfo: TxnInfo): F[S] = for {
segment <- Sync[F].delay(txnInfo.transaction.startSegment(segmentName))
res <- endSegmentOnError(block, segment)
_ <- Sync[F].delay(segment.end())
} yield res
Expand All @@ -73,24 +81,26 @@ object TraceOps {
* table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. the code below will produce a segment <i>trace map segment</i>
* <pre>
* IO(1)
* .map(traceFun("trace map segment")(i => i + 1))
* .filter(traceFun("trace filter segment")(i => i % 2 == 0))
* txn { implicit txnInfo =>
* IO(1)
* .map(traceFun("trace map 1 segment")(i => i + 1))
* .map(traceFun("trace map 2 segment")(i => i + 2))
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
* This name will show up in the Transaction Breakdown table, as well as the Transaction Trace page.
* <p>
* if null or an empty String, the agent will report "Unnamed Segment".
* @param f Function segment is to capture metrics for.
* @param txnInfo implicit Transaction and Token segment is attached to
* @tparam T Input type for function segment is to capture metrics for.
* @tparam S Type returned from executed function
* @return Value returned from executed function
*/
def traceFun[T, S](segmentName: String)(f: T => S): T => S = {
def traceFun[T, S](segmentName: String)(f: T => S)(implicit txnInfo: TxnInfo): T => S = {
t: T =>
val txn = NewRelic.getAgent.getTransaction()
val segment = txn.startSegment(segmentName)
val segment = txnInfo.transaction.startSegment(segmentName)
try {
f(t)
} finally {
Expand All @@ -106,21 +116,24 @@ object TraceOps {
* This {@link Segment} will show up in the Transaction Breakdown table, as well as the Transaction Trace page. This {@link Segment} will be reported in the "Custom/" metric
* e.g. The code below will produce 1 segment <i>trace flatMap segment</i>
* <pre>
* IO(1).flatMap(asyncTraceFun("trace flatMap segment")(i => IO(i + 1)))
* txn { implicit txnInfo =>
* IO(1).flatMap(asyncTraceFun("trace flatMap segment")(i => IO(i + 1)))
* }
* </pre>
*
* @param segmentName Name of the { @link Segment} segment in APM.
* This name will show up in the Transaction Breakdown table, as well as the Transaction Trace page.
* <p>
* if null or an empty String, the agent will report "Unnamed Segment".
* @param f Asynchronous function segment is to capture metrics for.
* @param txnInfo implicit Transaction and Token segment is attached to
* @tparam T Input type for function segment is to capture metrics for.
* @tparam S Type returned from completed asynchronous function
* @return Value returned from completed asynchronous function
*/
def asyncTraceFun[T, S, F[_] : Sync](segmentName: String)(f: T => F[S]): T => F[S] = { t: T =>
def asyncTraceFun[T, S, F[_] : Sync](segmentName: String)(f: T => F[S])(implicit txnInfo: TxnInfo): T => F[S] = { t: T =>
for {
segment <- startSegment(segmentName)
segment <- Sync[F].delay(txnInfo.transaction.startSegment(segmentName))
evaluatedFunc <- endSegmentOnError(f(t), segment)
_ <- Sync[F].delay(segment.end())
} yield evaluatedFunc
Expand All @@ -133,23 +146,18 @@ object TraceOps {
* The newly created {@link com.newrelic.api.agent.Transaction} will complete once the code block has been executed
* e.g. the code below will create a Transaction and with 2 segments <i>trace map IO</i> and <i>trace filter IO</i>
* <pre>
* txn {
* IO(1).map(traceFun("trace map IO")(i => i + 1))
* }
* txn { implicit txnInfo =>
* IO(1).map(traceFun("trace map IO")(i => i + 1))
* }
* </pre>
*
* @param block Code block to be executed inside a transaction
* @tparam S Type returned by code block
* @return Value returned by code block
*/
def txn[S, F[_] : Sync](body: F[S]): F[S] = body

private def startSegment[F[_] : Sync](segmentName: String): F[Segment] = Sync[F].delay {
val txn = NewRelic.getAgent.getTransaction()
txn.startSegment(segmentName)
}
def txn[S, F[_] : Sync](body: TxnInfo => F[S]): F[S] = body(txnInfo)

private def endSegmentOnError[S, F[_] : Sync](sync: F[S], segment: Segment) =
private def endSegmentOnError[S, F[_] : Sync](sync: F[S], segment: Segment): F[S] =
sync.handleErrorWith(t => {
segment.end()
Sync[F].raiseError(t)
Expand Down
Loading