Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/instana/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def key_to_bool(k: str) -> bool:
import inspect

all_accepted_patch_all_args = inspect.getfullargspec(monkey.patch_all)[0]
provided_options = provided_options.replace(" ", "").replace("--", "").split(",")
provided_options = (
provided_options.replace(" ", "").replace("--", "").split(",")
)

provided_options = [
k for k in provided_options if short_key(k) in all_accepted_patch_all_args
Expand Down Expand Up @@ -210,11 +212,6 @@ def boot_agent() -> None:
server as tornado_server, # noqa: F401
)

# Hooks
from instana.hooks import (
hook_gunicorn, # noqa: F401
)


def _start_profiler() -> None:
"""Start the Instana Auto Profile."""
Expand Down
8 changes: 6 additions & 2 deletions src/instana/agent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,13 @@ def diagnostics(self) -> None:
logger.warning(
f"is_collector_thread_running?: {self.collector.is_reporting_thread_running()}"
)
logger.warning(
f"background_report_lock.locked?: {self.collector.background_report_lock.locked()}"
# RLock doesn't have a locked() method, so we check by trying to acquire
lock_acquired = self.collector.background_report_lock.acquire(
blocking=False
)
if lock_acquired:
self.collector.background_report_lock.release()
Comment on lines +464 to +465
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not be following correctly, but why are you releasing the lock in a function that mainly prints a diagnostic of the agent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, we were using the locked() method to check if the background_report_check is locked or not but as we moved to RLock, this new object doesn't have this method. We acquire the thread without blocking and release it immediately.

logger.warning(f"background_report_lock.locked?: {not lock_acquired}")
logger.warning(f"ready_to_start: {self.collector.ready_to_start}")
logger.warning(f"reporting_thread: {self.collector.reporting_thread}")
logger.warning(f"report_interval: {self.collector.report_interval}")
Expand Down
14 changes: 7 additions & 7 deletions src/instana/collector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, agent: Type["BaseAgent"]) -> None:
# Lock used synchronize reporting - no updates when sending
# Used by the background reporting thread. Used to synchronize report attempts and so
# that we never have two in progress at once.
self.background_report_lock = threading.Lock()
self.background_report_lock = threading.RLock()

# Reporting interval for the background thread(s)
self.report_interval = 1
Expand All @@ -68,12 +68,9 @@ def __init__(self, agent: Type["BaseAgent"]) -> None:

def is_reporting_thread_running(self) -> bool:
"""
Indicates if there is a thread running with the name self.THREAD_NAME
Checks if the collector is started and the reporting thread is alive.
"""
for thread in threading.enumerate():
if thread.name == self.THREAD_NAME:
return True
return False
return bool(self.reporting_thread and self.reporting_thread.is_alive())

def start(self) -> None:
"""
Expand All @@ -91,8 +88,9 @@ def start(self) -> None:
timer.start()
return
logger.debug(
f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})"
f"BaseCollector.start: Skipping start call - reporting thread already running (started: {self.started})"
)
return

if self.agent.can_send():
logger.debug("BaseCollector.start: launching collection thread")
Expand Down Expand Up @@ -120,6 +118,8 @@ def shutdown(self, report_final: bool = True) -> None:
logger.debug("Collector.shutdown: Reporting final data.")
self.prepare_and_report_data()
self.started = False
# Clear the thread reference to ensure clean restart after fork
self.reporting_thread = None

def background_report(self) -> None:
"""
Expand Down
19 changes: 10 additions & 9 deletions src/instana/collector/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""

from time import time
from typing import DefaultDict, Any
from typing import Any, DefaultDict

from instana.collector.base import BaseCollector
from instana.collector.helpers.runtime import RuntimeHelper
Expand Down Expand Up @@ -43,19 +43,20 @@ def prepare_and_report_data(self) -> None:
state machine case.
"""
try:
if self.agent.machine.fsm.current == "wait4init":
with self.agent.machine.lock:
current_state = self.agent.machine.fsm.current

if current_state == "wait4init":
# Test the host agent if we're ready to send data
if self.agent.is_agent_ready():
if self.agent.machine.fsm.current != "good2go":
logger.debug("Agent is ready. Getting to work.")
self.agent.machine.fsm.ready()
with self.agent.machine.lock:
if self.agent.machine.fsm.current != "good2go":
logger.debug("Agent is ready. Getting to work.")
self.agent.machine.fsm.ready()
else:
return

if (
self.agent.machine.fsm.current == "good2go"
and self.agent.is_timed_out()
):
if current_state == "good2go" and self.agent.is_timed_out():
logger.info(
"The Instana host agent has gone offline or is no longer reachable for > 1 min. Will retry periodically."
)
Expand Down
69 changes: 49 additions & 20 deletions src/instana/fsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ class TheMachine:
RETRY_PERIOD = 30
THREAD_NAME = "Instana Machine"

warnedPeriodic = False

def __init__(self, agent: "HostAgent") -> None:
logger.debug("Initializing host agent state machine")

self._lock = threading.RLock()
self._warned_periodic = False

self.agent = agent
self.fsm = Fysom(
{
"initial": "*",
"events": [
("lookup", "*", "found"),
("announce", "found", "announced"),
Expand All @@ -42,7 +44,7 @@ def __init__(self, agent: "HostAgent") -> None:
],
"callbacks": {
# Can add the following to debug
# "onchangestate": self.print_state_change,
# "onchangestate": self.print_state_change,
"onlookup": self.lookup_agent_host,
"onannounce": self.announce_sensor,
"onpending": self.on_ready,
Expand All @@ -51,17 +53,33 @@ def __init__(self, agent: "HostAgent") -> None:
}
)

self.timer = threading.Timer(1, self.fsm.lookup)
self.timer.daemon = True
self.timer.name = self.THREAD_NAME
self.timer.start()
with self._lock:
self.timer = threading.Timer(1, self._safe_fsm_lookup)
self.timer.daemon = True
self.timer.name = self.THREAD_NAME
self.timer.start()

@staticmethod
def print_state_change(e: Any) -> None:
logger.debug(
f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} =========="
)

def _safe_fsm_lookup(self) -> None:
"""Thread-safe wrapper for FSM lookup."""
with self._lock:
self.fsm.lookup()

def _safe_fsm_announce(self) -> None:
"""Thread-safe wrapper for FSM announce."""
with self._lock:
self.fsm.announce()

def _safe_fsm_pending(self) -> None:
"""Thread-safe wrapper for FSM pending."""
with self._lock:
self.fsm.pending()

def reset(self) -> None:
"""
reset is called to start from scratch in a process. It may be called on first boot or
Expand All @@ -73,14 +91,14 @@ def reset(self) -> None:
:return: void
"""
logger.debug("State machine being reset. Will start a new announce cycle.")
self.fsm.lookup()
self._safe_fsm_lookup()

def lookup_agent_host(self, e: Any) -> bool:
host = self.agent.options.agent_host
port = self.agent.options.agent_port

if self.agent.is_agent_listening(host, port):
self.fsm.announce()
self._safe_fsm_announce()
return True

if os.path.exists("/proc/"):
Expand All @@ -89,14 +107,15 @@ def lookup_agent_host(self, e: Any) -> bool:
if self.agent.is_agent_listening(host, port):
self.agent.options.agent_host = host
self.agent.options.agent_port = port
self.fsm.announce()
self._safe_fsm_announce()
return True

if self.warnedPeriodic is False:
logger.info(
"Instana Host Agent couldn't be found. Will retry periodically..."
)
self.warnedPeriodic = True
with self._lock:
if self._warned_periodic is False:
logger.info(
"Instana Host Agent couldn't be found. Will retry periodically..."
)
self._warned_periodic = True

self.schedule_retry(
self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup"
Expand Down Expand Up @@ -143,17 +162,18 @@ def announce_sensor(self, e: Any) -> bool:
return False

self.agent.set_from(payload)
self.fsm.pending()
self._safe_fsm_pending()
logger.debug(
f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..."
)
return True

def schedule_retry(self, fun: Callable, e: Any, name: str) -> None:
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
self.timer.daemon = True
self.timer.name = name
self.timer.start()
with self._lock:
self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e])
self.timer.daemon = True
self.timer.name = name
self.timer.start()

def on_ready(self, _: Any) -> None:
self.agent.start()
Expand Down Expand Up @@ -258,3 +278,12 @@ def _get_cmdline(self, pid: int) -> List[str]:
except Exception:
logger.debug("Error getting command line: ", exc_info=True)
return sys.argv

@property
def lock(self) -> threading.RLock:
"""
Returns the thread lock used for synchronizing FSM state transitions.

:return: The RLock instance used for thread synchronization
"""
return self._lock
Empty file removed src/instana/hooks/__init__.py
Empty file.
25 changes: 0 additions & 25 deletions src/instana/hooks/hook_gunicorn.py

This file was deleted.

43 changes: 42 additions & 1 deletion tests/collector/test_base_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def reporting_function():
name=self.collector.THREAD_NAME, target=reporting_function
)
sample_thread.start()
# Set the required state for is_reporting_thread_running to return True
self.collector.started = True
self.collector.reporting_thread = sample_thread
try:
assert self.collector.is_reporting_thread_running()
finally:
Expand Down Expand Up @@ -86,7 +89,7 @@ def test_start_collector_while_running_thread(
):
self.collector.start()
assert (
"BaseCollector.start non-fatal: call but thread already running (started: False)"
"BaseCollector.start: Skipping start call - reporting thread already running (started: False)"
in caplog.messages
)

Expand Down Expand Up @@ -207,3 +210,41 @@ def test_queued_profiles(
time.sleep(0.1)
profiles = self.collector.queued_profiles()
assert len(profiles) == 3

def test_is_reporting_thread_running_when_thread_is_none(self) -> None:
"""Test is_reporting_thread_running when reporting_thread is None."""
self.collector.reporting_thread = None
assert not self.collector.is_reporting_thread_running()

def test_is_reporting_thread_running_when_thread_is_dead(self) -> None:
"""Test is_reporting_thread_running when thread has finished."""

def quick_function():
pass

sample_thread = threading.Thread(target=quick_function)
sample_thread.start()
sample_thread.join() # Wait for thread to finish

self.collector.reporting_thread = sample_thread
assert not self.collector.is_reporting_thread_running()

def test_is_reporting_thread_running_when_started_false(self) -> None:
"""Test is_reporting_thread_running when started is False but thread exists."""
stop_event = threading.Event()

def reporting_function():
stop_event.wait()

sample_thread = threading.Thread(target=reporting_function)
sample_thread.start()

self.collector.started = False
self.collector.reporting_thread = sample_thread

try:
# Should still return True if thread is alive, regardless of started flag
assert self.collector.is_reporting_thread_running()
finally:
stop_event.set()
sample_thread.join()
Loading
Loading