2024-04-01 10:40:20 +02:00

162 lines
5.6 KiB
Python

from NSCP import Settings, Registry, Core, log, status, log_error, sleep
from test_helper import BasicTest, TestResult, Callable, setup_singleton, install_testcases, init_testcases, shutdown_testcases
from types import *
import uuid
import os
def create_test_data(file):
with open(file, "w") as f:
f.write("1,A,Test 1\n")
f.write("2,A,Test 2\n")
f.write("3,B,Test 1\n")
f.write("4,B,Test 1\n")
f.write("5,C,Test 1\n")
f.write("6,B,Test 2\n")
def delete_file(file):
if os.path.exists(file):
try:
os.remove(file)
except OSError, (errno, strerror):
log('Failed to delete: %s'%file)
class LogFileTest(BasicTest):
reg = None
conf = None
core = None
def __init__(self):
self.temp_path = None
self.work_file = None
None
def desc(self):
return 'Testcase for check_file module'
def title(self):
return 'Win32File tests'
def setup(self, plugin_id, prefix):
self.reg = Registry.get(plugin_id)
self.temp_path = self.core.expand_path('${temp}')
log('Temp: %s'%self.temp_path)
self.work_path = os.path.join(self.temp_path, '%s.txt'%uuid.uuid4())
log('Work: %s'%self.work_path)
create_test_data(self.work_path)
def teardown(self):
delete_file(self.work_path)
def get_count(self,perf):
if not perf:
return -1
(title, data) = perf.split('=')
if not data:
return -1
(count, warn, crit) = data.split(';')
return int(count)
def check_files(self, filter, text, expected):
alias = '%s: %s'%(text, filter)
result = TestResult('Checking %s'%alias)
args = ['file=%s'%self.work_path, 'column-split=,', 'filter=%s'%filter, 'warn=count gt %d'%expected, 'crit=count gt %d'%expected]
#log("Command: %s"%args)
(ret, msg, perf) = self.core.simple_query('check_logfile', args)
log("%s : %s -- %s"%(filter, msg, perf))
count = self.get_count(perf)
result.add_message(count == expected, '%s - number of files'%filter, 'got %s expected %s'%(count, expected))
result.add_message(ret == status.OK, '%s -- status', 'got %s expected OK'%ret)
return result
def check_bound(self, filter, warn, crit, expected):
alias = '%s/%s/%s'%(filter, warn, crit)
result = TestResult('Checking %s'%alias)
args = ['file=%s'%self.work_path, 'column-split=,', 'filter=%s'%filter, 'warn=%s'%warn, 'crit=%s'%crit]
#log("Command: %s"%args)
(ret, msg, perf) = self.core.simple_query('check_logfile', args)
log("%s : %s -- %s"%(filter, msg, perf))
result.add_message(ret == expected, 'Check status', 'Invalid check status: %s'%ret)
return result
def run_filter_operator_test(self):
result = TestResult('Filter tests')
result.add(self.check_files('none', 'Count all lines', 7))
result.add(self.check_files("column2 = 'A'", 'Count all A', 2))
result.add(self.check_files("column2 = 'B'", 'Count all B', 3))
result.add(self.check_files("column2 = 'C'", 'Count all C', 1))
result.add(self.check_files("column3 = 'Test 1'", 'Count all T1', 4))
result.add(self.check_files("column3 like 'Test'", 'Count all T', 6))
result.add(self.check_files("column3 not like '1'", 'Count all T', 3))
result.add(self.check_files("column1 > 1", 'Count all B', 5))
result.add(self.check_files("column1 > 3", 'Count all B', 3))
result.add(self.check_files("column1 > 5", 'Count all B', 1))
result.add(self.check_files("column1 < 1", 'Count all B', 1))
result.add(self.check_files("column1 < 3", 'Count all B', 3))
result.add(self.check_files("column1 < 5", 'Count all B', 5))
result.add(self.check_files("column1 = 1", 'Count all B', 1))
result.add(self.check_files("column1 = 3", 'Count all B', 1))
result.add(self.check_files("column1 = 5", 'Count all B', 1))
result.add(self.check_files("column1 != 1", 'Count all B', 6))
result.add(self.check_files("column1 != 3", 'Count all B', 6))
result.add(self.check_files("column1 != 5", 'Count all B', 6))
return result
def run_boundry_test(self):
result = TestResult('Boundry tests')
result.add(self.check_bound('none', 'count > 1', 'none', status.WARNING))
result.add(self.check_bound('none', 'none', 'count > 1', status.CRITICAL))
result.add(self.check_bound('column1 > 5', 'count > 2', 'count > 5', status.OK))
result.add(self.check_bound('column1 > 4', 'count > 2', 'count > 5', status.OK))
result.add(self.check_bound('column1 > 3', 'count > 2', 'count > 5', status.WARNING))
result.add(self.check_bound('column1 > 2', 'count > 2', 'count > 5', status.WARNING))
result.add(self.check_bound('column1 > 1', 'count > 2', 'count > 5', status.WARNING))
result.add(self.check_bound('column1 > 0', 'count > 2', 'count > 5', status.CRITICAL))
result.add(self.check_bound('column1 > 5', 'column1 = 3', 'none', status.OK))
result.add(self.check_bound('column1 > 0', 'column1 = 3', 'none', status.WARNING))
return result
def run_test(self):
result = TestResult('Test')
result.append(self.run_filter_operator_test())
result.append(self.run_boundry_test())
return result
def install(self, arguments):
conf = self.conf
conf.set_string('/modules', 'test_disk', 'CheckLogFile')
conf.set_string('/modules', 'pytest', 'PythonScript')
conf.set_string('/settings/pytest/scripts', 'test_logfile', 'test_log_file.py')
conf.save()
def uninstall(self):
None
def help(self):
None
def init(self, plugin_id, prefix):
self.reg = Registry.get(plugin_id)
self.core = Core.get(plugin_id)
self.conf = Settings.get(plugin_id)
def shutdown(self):
None
setup_singleton(LogFileTest)
all_tests = [LogFileTest]
def __main__(args):
install_testcases(all_tests)
def init(plugin_id, plugin_alias, script_alias):
init_testcases(plugin_id, plugin_alias, script_alias, all_tests)
def shutdown():
shutdown_testcases()