First commit and first version of the add-on

This commit is contained in:
zxBCN Gallego_Izquierdo
2024-05-31 14:07:19 +02:00
commit 1094b4458d
423 changed files with 130141 additions and 0 deletions

View File

@@ -0,0 +1,110 @@
import json
import logging
import sys
import import_declare_test
from solnlib import conf_manager, log
from splunklib import modularinput as smi
ADDON_NAME = "Snet_check_addon"
def logger_for_input(input_name: str) -> logging.Logger:
return log.Logs().get_logger(f"{ADDON_NAME.lower()}_{input_name}")
def get_account_api_key(session_key: str, account_name: str):
cfm = conf_manager.ConfManager(
session_key,
ADDON_NAME,
realm=f"__REST_CREDENTIAL__#{ADDON_NAME}#configs/conf-Snet_check_addon_account",
)
account_conf_file = cfm.get_conf("Snet_check_addon_account")
return account_conf_file.get(account_name).get("api_key")
def get_data_from_api(logger: logging.Logger, api_key: str):
logger.info("Getting data from an external API")
dummy_data = [
{
"line1": "hello",
},
{
"line2": "world",
},
]
return dummy_data
class Input(smi.Script):
def __init__(self):
super().__init__()
def get_scheme(self):
scheme = smi.Scheme("Snet_Check_Addon")
scheme.description = "Snet_Check_Addon input"
scheme.use_external_validation = True
scheme.streaming_mode_xml = True
scheme.use_single_instance = False
scheme.add_argument(
smi.Argument(
"name", title="Name", description="Name", required_on_create=True
)
)
return scheme
def validate_input(self, definition: smi.ValidationDefinition):
return
def stream_events(self, inputs: smi.InputDefinition, event_writer: smi.EventWriter):
# inputs.inputs is a Python dictionary object like:
# {
# "Snet_Check_Addon://<input_name>": {
# "account": "<account_name>",
# "disabled": "0",
# "host": "$decideOnStartup",
# "index": "<index_name>",
# "interval": "<interval_value>",
# "python.version": "python3",
# },
# }
for input_name, input_item in inputs.inputs.items():
normalized_input_name = input_name.split("/")[-1]
logger = logger_for_input(normalized_input_name)
try:
session_key = self._input_definition.metadata["session_key"]
log_level = conf_manager.get_log_level(
logger=logger,
session_key=session_key,
app_name=ADDON_NAME,
conf_name=f"{ADDON_NAME}_settings",
)
logger.setLevel(log_level)
log.modular_input_start(logger, normalized_input_name)
api_key = get_account_api_key(session_key, input_item.get("account"))
data = get_data_from_api(logger, api_key)
sourcetype = "dummy-data"
for line in data:
event_writer.write_event(
smi.Event(
data=json.dumps(line, ensure_ascii=False, default=str),
index=input_item.get("index"),
sourcetype=sourcetype,
)
)
log.events_ingested(
logger,
input_name,
sourcetype,
len(data),
input_item.get("index"),
account=input_item.get("account")
)
log.modular_input_end(logger, normalized_input_name)
except Exception as e:
log.log_exception(logger, e, msg_before="Exception raised while ingesting data for demo_input: ")
if __name__ == "__main__":
exit_code = Input().run(sys.argv)
sys.exit(exit_code)

View File

@@ -0,0 +1,40 @@
import requests
import base64
import logging
logger = logging.getLogger(__name__)
class MenMiceLogin():
base_url = "https://ipam-api.eu.boehringer.com/mmws/api"
#base_url = "https://10.183.177.24/mmws/api"
def __init__(self, username, password) -> None:
self.auth = base64.b64encode(f"{username}:{password}".encode()).decode()
self.authenticate()
def authenticate(self):
# Código para autenticar
headers = {
"Authorization": f"Basic {self.auth}",
"Content-Type": "application/json"
}
login_url = f"{self.base_url}/login"
response = requests.get(login_url, headers=headers, verify=False)
if response.ok:
r_json = response.json()
self.token = r_json["result"]["session"]
logger.info("Login succesful to Men&Mice")
return self.token
else:
print("Fallo al hacer loggin")
raise Exception(f"Can't log to Men&Mice\r\n {r_json} \r\n {self.auth} \r\n {self.token}")
'''Código para probar el login'''
'''if __name__ == "__main__":
username = "x2ipmgmtsoar4pa"
password = "6g9e:q+!X&b~'W~@:~diO7z0qb.lQX"
men = MenMiceLogin(username, password)
response = men.authenticate()
print(response)'''

View File

@@ -0,0 +1,14 @@
import requests
from requests.auth import HTTPBasicAuth
class MenMiceQuery():
base_url = "https://ipam.eu.boehringer.com/mmws/api/IPAMRecords"
#base_url = "https://10.183.177.24/mmws/api"
def run_query(self, username, password, ip):
response = requests.get(f'{self.base_url}/{ip}/Range', auth=HTTPBasicAuth(username, password), verify=False)
if response.ok:
return response.json()
else:
print(response.json())
raise Exception(f"Error detected: {response.status_code}")

View File

@@ -0,0 +1,61 @@
import json
import os
import sys
import time
from menmice_query import MenMiceQuery
import logging
splunkhome = os.environ['SPLUNK_HOME']
sys.path.append(os.path.join(splunkhome, 'etc', 'apps', 'Snet_check_addon', 'lib'))
from splunklib.searchcommands import dispatch, GeneratingCommand, Configuration, Option, validators
from solnlib import conf_manager, log
ADDON_NAME = "Snet_check_addon"
def get_account_api_key(session_key: str, account_name: str):
cfm = conf_manager.ConfManager(
session_key,
ADDON_NAME,
realm=f"__REST_CREDENTIAL__#{ADDON_NAME}#configs/conf-snet_check_addon_account",
)
account_conf_file = cfm.get_conf("snet_check_addon_account")
username = account_conf_file.get(account_name).get("username")
password = account_conf_file.get(account_name).get("password")
logging.info(f"User is {username}")
return username, password
@Configuration()
class GenerateQueryCommand(GeneratingCommand):
ip = Option(require=True)
def run_query(self):
try:
self.logger.info("starting run_query menmice...")
session_key = self.service.token
self.logger.info("getting configuration menmice")
username, password = get_account_api_key(session_key, "MenMice")
self.logger.info("Configuration succesful menmice")
except Exception as e:
self.logger.debug(e, exc_info=1)
menmice = MenMiceQuery()
response = menmice.run_query(username, password, self.ip)
return response
def generate(self):
r_json = self.run_query()
snet = r_json['result']['range']['customProperties']['Usage']
if snet == "SNET":
snet = True
else:
snet = False
logging.info(f"Result of the SNET is {snet}")
result = {
"result": snet
}
result.update({'_time': time.time(), '_raw': json.dumps(result)})
yield result
dispatch(GenerateQueryCommand, sys.argv, sys.stdin, sys.stdout, __name__)