Skip to content

topl-core-base / io.matthewnelson.topl_core_base / EventBroadcaster

EventBroadcaster

abstract class EventBroadcaster :BaseConsts (source)

Service for sending event logs to the system.

Both topl-core and topl-service utilize this class to broadcast messages. This allows for easier separation of messages based on the type, process or class.

See BaseConsts.BroadcastTypes

/**
 * [io.matthewnelson.topl_core.OnionProxyManager] utilizes this customized class for
 * broadcasting things while it is operating (such as Tor's State, operation errors,
 * debugging, etc).
 *
 * [ServiceEventListener] utilizes this class by sending it what Tor is spitting out
 * (selectively curated, ofc).
 *
 * @param [torService] [BaseService] for context.
 * */
internal class ServiceEventBroadcaster private constructor(
    private val torService: BaseService
): EventBroadcaster() {

    companion object {
        @JvmSynthetic
        fun instantiate(torService: BaseService): ServiceEventBroadcaster =
            ServiceEventBroadcaster(torService)
    }

    private val scopeMain: CoroutineScope
        get() = torService.getScopeMain()

    /////////////////
    /// Bandwidth ///
    /////////////////
    @Volatile
    private var bytesRead = 0L
    @Volatile
    private var bytesWritten = 0L

    override fun broadcastBandwidth(bytesRead: String, bytesWritten: String) {
        val read =
            try {
                bytesRead.toLong()
            } catch (e: NumberFormatException) {
                this.bytesRead
            }
        val written =
            try {
                bytesWritten.toLong()
            } catch (e: NumberFormatException) {
                this.bytesWritten
            }

        // Only update the notification if proper State is had & we're bootstrapped.
        if (torState == TorState.ON &&
            torNetworkState == TorNetworkState.ENABLED &&
            isBootstrappingComplete()
        ) {
            if (read != this.bytesRead || written != this.bytesWritten) {
                this.bytesRead = read
                this.bytesWritten = written

                updateBandwidth(read, written)

                if (read == 0L && written == 0L)
                    torService.updateNotificationIcon(NotificationImage.ENABLED)
                else
                    torService.updateNotificationIcon(NotificationImage.DATA)
            }
        }

        TorServiceController.appEventBroadcaster?.let {
            scopeMain.launch { it.broadcastBandwidth(bytesRead, bytesWritten) }
        }
    }

    /**
     * Do a check for if a message is being displayed in the contentText of the
     * notification, allowing it to remain there unabated until the coroutine
     * finishes.
     * */
    private fun updateBandwidth(download: Long, upload: Long) {
        if (noticeMsgToContentTextJob?.isActive == true) return
        torService.updateNotificationContentText(
            ServiceUtilities.getFormattedBandwidthString(download, upload)
        )
    }


    /////////////
    /// Debug ///
    /////////////
    override fun broadcastDebug(msg: String) {
        TorServiceController.appEventBroadcaster?.let {
            scopeMain.launch { it.broadcastDebug(msg) }
        }
    }


    //////////////////
    /// Exceptions ///
    //////////////////
    override fun broadcastException(msg: String?, e: Exception) {
        if (!msg.isNullOrEmpty()) {
            if (msg.contains(TorService::class.java.simpleName)) {
                torService.updateNotificationIcon(NotificationImage.ERROR)
                val msgSplit = msg.split("|")
                msgSplit.elementAtOrNull(2)?.let {
                    torService.updateNotificationContentText(it)
                    torService.updateNotificationProgress(false, null)
                }
            }
        }

        TorServiceController.appEventBroadcaster?.let {
            scopeMain.launch { it.broadcastException(msg, e) }
        }
    }


    ///////////////////
    /// LogMessages ///
    ///////////////////
    override fun broadcastLogMessage(logMessage: String?) {
        TorServiceController.appEventBroadcaster?.let {
            scopeMain.launch { it.broadcastLogMessage(logMessage) }
        }
    }


    ///////////////
    /// Notices ///
    ///////////////
    private var noticeMsgToContentTextJob: Job? = null

    @Volatile
    private var bootstrapProgress = ""
    private fun isBootstrappingComplete(): Boolean =
        bootstrapProgress == "Bootstrapped 100%"

    @Volatile
    private var controlPort: String? = null
    @Volatile
    private var dnsPort: String? = null
    @Volatile
    private var httpTunnelPort: String? = null
    @Volatile
    private var socksPort: String? = null
    @Volatile
    private var transPort: String? = null

    private fun setAllPortsNull() {
        controlPort = null
        dnsPort = null
        httpTunnelPort = null
        socksPort = null
        transPort = null
    }

    override fun broadcastNotice(msg: String) {

        when {
            // ServiceActionProcessor
            msg.contains(ServiceActionProcessor::class.java.simpleName) -> {
                handleServiceActionProcessorMsg(msg)
            }
            // WARN|OnionProxyManager|No Network Connectivity. Foregoing enabling of Tor Network.
            msg.contains("No Network Connectivity. Foregoing enabling of Tor Network.") -> {
                torService.updateNotificationProgress(false, null)
                torService.updateNotificationContentText("No Network Connectivity. Waiting...")
            }
            // BOOTSTRAPPED
            msg.contains("Bootstrapped") -> {
                handleBootstrappedMsg(msg)
            }
            // Control Port
            // NOTICE|OnionProxyManager|Successfully connected to Control Port: 44201
            msg.contains("Successfully connected to Control Port:") -> {
                controlPort = getPortFromMsg(msg)
                if (isBootstrappingComplete())
                    updateAppEventBroadcasterWithPortInfo()
            }
            // Dns Port
            // NOTICE|OnionProxyManager|Opened DNS listener on 127.0.0.1:5400
            msg.contains("Opened DNS listener ") -> {
                dnsPort = getPortFromMsg(msg)
                if (isBootstrappingComplete())
                    updateAppEventBroadcasterWithPortInfo()
            }
            // Http Tunnel Port
            // NOTICE|BaseEventListener|Opened HTTP tunnel listener on 127.0.0.1:8118
            msg.contains("Opened HTTP tunnel listener ") -> {
                httpTunnelPort = getPortFromMsg(msg)
                if (isBootstrappingComplete())
                    updateAppEventBroadcasterWithPortInfo()
            }
            // Socks Port
            // NOTICE|BaseEventListener|Opened Socks listener on 127.0.0.1:9050
            msg.contains("Opened Socks listener ") -> {
                socksPort = getPortFromMsg(msg)
                if (isBootstrappingComplete())
                    updateAppEventBroadcasterWithPortInfo()
            }
            // Trans Port
            // NOTICE|BaseEventListener|Opened Transparent pf/netfilter listener on 127.0.0.1:9040
            msg.contains("Opened Transparent pf/netfilter listener ") -> {
                transPort = getPortFromMsg(msg)
                if (isBootstrappingComplete())
                    updateAppEventBroadcasterWithPortInfo()
            }
            // NEWNYM
            msg.contains(TorControlCommands.SIGNAL_NEWNYM) -> {
                handleNewNymMsg(msg)
            }
        }

        TorServiceController.appEventBroadcaster?.let {
            scopeMain.launch { it.broadcastNotice(msg) }
        }
    }

    // NOTICE|BaseEventListener|Bootstrapped 5% (conn): Connecting to a relay
    private fun handleBootstrappedMsg(msg: String) {
        val msgSplit = msg.split(" ")
        msgSplit.elementAtOrNull(2)?.let {
            val bootstrapped = "${msgSplit[0]} ${msgSplit[1]}".split("|")[2]

            if (bootstrapped != bootstrapProgress) {
                torService.updateNotificationContentText(bootstrapped)

                if (bootstrapped == "Bootstrapped 100%") {
                    updateAppEventBroadcasterWithPortInfo()
                    torService.updateNotificationIcon(NotificationImage.ENABLED)
                    torService.updateNotificationProgress(true, 100)
                    torService.updateNotificationProgress(false, null)
                    torService.addNotificationActions()
                } else {
                    val progress: Int? = try {
                        bootstrapped.split(" ")[1].split("%")[0].toInt()
                    } catch (e: Exception) {
                        null
                    }
                    progress?.let {
                        torService.updateNotificationProgress(true, progress)
                    }
                }

                bootstrapProgress = bootstrapped
            }
        }
    }

    private fun getPortFromMsg(msg: String): String =
        "127.0.0.1:${msg.split(":")[1].trim()}"

    private fun handleNewNymMsg(msg: String) {
        val msgToShow: String? =
            when {
                msg.contains(OnionProxyManager.NEWNYM_SUCCESS_MESSAGE) -> {
                    OnionProxyManager.NEWNYM_SUCCESS_MESSAGE
                }
                msg.contains(OnionProxyManager.NEWNYM_NO_NETWORK) -> {
                    OnionProxyManager.NEWNYM_NO_NETWORK
                }
                else -> {
                    val msgSplit = msg.split("|")
                    msgSplit.elementAtOrNull(2)
                }
            }

        if (noticeMsgToContentTextJob?.isActive == true)
            noticeMsgToContentTextJob?.cancel()

        msgToShow?.let {
            displayMessageToContentText(it, 3500L)
        }
    }

    private fun handleServiceActionProcessorMsg(msg: String) {
        val msgSplit = msg.split("|")
        val msgToShow: String? = msgSplit.elementAtOrNull(2)?.let {
            when (it) {
                ServiceActionName.RESTART_TOR -> {
                    "Restarting Tor..."
                }
                ServiceActionName.START -> {
                    // Need to check here if Tor is already on, as startTor can be called
                    // anytime which would overwrite the contentText already showing and
                    // then stay there until something else updates it.
                    if (torState != TorState.ON)
                        "Starting Tor..."
                    else
                        null
                }
                ServiceActionName.STOP -> {
                    "Stopping Service..."
                }
                else -> {
                    null
                }
            }
        }
        msgToShow?.let {
            torService.updateNotificationContentText(it)
        }
    }

    private var broadcastPortInfoJob: Job? = null
    private fun updateAppEventBroadcasterWithPortInfo() {
        if (broadcastPortInfoJob?.isActive == true)
            broadcastPortInfoJob?.cancel()

        TorServiceController.appEventBroadcaster?.let {
            broadcastPortInfoJob = scopeMain.launch {
                delay(100L)
                it.broadcastPortInformation(
                    TorPortInfo(
                        controlPort,
                        dnsPort,
                        httpTunnelPort,
                        socksPort,
                        transPort
                    )
                )
            }
        }
    }

    /**
     * Display a message in the notification's ContentText space for the defined
     * [delayMilliSeconds], after which (if Tor is connected), publish to the Notification's
     * ContentText the most recently broadcast bandwidth via [bytesRead] && [bytesWritten].
     * */
    private fun displayMessageToContentText(message: String, delayMilliSeconds: Long) {
        noticeMsgToContentTextJob = scopeMain.launch {
            torService.updateNotificationContentText(message)
            delay(delayMilliSeconds)

            // Publish the last bandwidth broadcast to overwrite the message.
            if (torNetworkState == TorNetworkState.ENABLED) {
                torService.updateNotificationContentText(
                    ServiceUtilities.getFormattedBandwidthString(bytesRead, bytesWritten)
                )
            } else if (isBootstrappingComplete()){
                torService.updateNotificationContentText(
                    ServiceUtilities.getFormattedBandwidthString(0L, 0L)
                )
            }
        }
    }


    ////////////////
    /// TorState ///
    ////////////////
    @Volatile
    private var torState = TorState.OFF
    @Volatile
    private var torNetworkState = TorNetworkState.DISABLED

    override fun broadcastTorState(@TorState state: String, @TorNetworkState networkState: String) {
        if (torState == TorState.ON && state != torState) {
            bootstrapProgress = ""
            setAllPortsNull()
            updateAppEventBroadcasterWithPortInfo()
            torService.removeNotificationActions()
        }

        if (state != TorState.ON)
            torService.updateNotificationProgress(true, null)

        torService.updateNotificationContentTitle(state)
        torState = state

        if (networkState == TorNetworkState.DISABLED) {
            if (isBootstrappingComplete()) {
                setAllPortsNull()
                updateAppEventBroadcasterWithPortInfo()
            }

            // Update torNetworkState _before_ setting the icon to `disabled` so bandwidth won't
            // overwrite the icon with an update
            torNetworkState = networkState
            torService.updateNotificationIcon(NotificationImage.DISABLED)
        } else {
            if (isBootstrappingComplete())
                torService.updateNotificationIcon(NotificationImage.ENABLED)

            // Update torNetworkState _after_ setting the icon to `enabled` so bandwidth changes
            // occur afterwards and this won't overwrite ImageState.DATA
            torNetworkState = networkState
        }

        TorServiceController.appEventBroadcaster?.let {
            scopeMain.launch { it.broadcastTorState(state, networkState) }
        }
    }
}

Constructors

Name Summary
<init> Service for sending event logs to the system.EventBroadcaster()

Functions

Name Summary
broadcastBandwidth bytesRead = bytes downloaded bytesWritten = bytes uploadedabstract fun broadcastBandwidth(bytesRead:String, bytesWritten:String):Unit
broadcastDebug (“DEBUG
broadcastException (“EXCEPTION
broadcastLogMessage Not yet implemented in either module.abstract fun broadcastLogMessage(logMessage:String?):Unit
broadcastNotice Will be one of:abstract fun broadcastNotice(msg:String):Unit
broadcastTorState See BaseConsts.TorState and BaseConsts.TorNetworkStateabstract fun broadcastTorState(state:String, networkState:String):Unit