土壌センサのデータ取得 – サンプルスクリプト[soil_sensor_mgr02.py]

 
サンプルスクリプト [soil_sensor_mgr02.py]

"""
  murata Soil Sensor SL5006 and RATOC Systems RS61G or USB60FP
  Evaluation Program V 1R0    05/Feb./2023  by C.Okamura RATOC Systems, Inc.
                       2R0    23/Feb./2023  Added Excel file access code

  Copyright 2023 RATOC Systems, Inc. All right reserved.
  running on Windows11 / Phython 3.11.1
"""
# -*- coding: utf-8 -*-


import serial
import time
import datetime
import openpyxl

# COM="COM4"       #USB60F
COM="COM5"       #SG61
bitRate=9600

# Open Serial port
ser = serial.Serial(COM, bitRate, timeout=10 )


wrtcmd = 2      # command for write data to register 
readcmd = 1     # command for read data from register
SNSR_CTR_reg = 7  # Sensor Control register
SNSR_STS_reg = 8  # Sensor Status register
FW_ver_reg   = 0  # F/W version register 3 bytes
SER_NUM_reg  = 3  # Serial Number register 4 bytes
TEMP_register = 19 # Temparature data register 2 buytes 0x13
EC_BULK_reg   = 21 # EC BULK data register 2 bytes 0x15
VWC_reg       = 25 # VWC data register 2 bytes 0X19
EC_PORE_reg   = 31 # EC_PORE data register 2 bytes 0x1f

# CRC16 ModBus caluculation
# Modbus-RTU polynomial 0x1021  X16 + X15 + X2 + X0 11000 0000 0000 0101 --> reverse bit order --> 0xa001

def crc16_Modbus(data : bytearray, offset , length):

    if data is None or offset < 0 or offset > len(data)- 1 and offset+length > len(data):
        return 0
    crc = 0xFFFF
    for i in range(0, length):
        crc ^= data[offset + i]
        for j in range(0,8):
            if (crc & 0x0001) == 1:      # check LSB
                crc =(crc >> 1) ^ 0xa001 
            else:
                crc = crc >> 1
    return crc.to_bytes(2, 'big')

# Receive message and data from SLT5006 Soil Sensor
def receive_response():
    rcv_hdr = ser.read( 3 )       # get header    
    f_code = rcv_hdr[0]           # get response code
    if f_code >= 0x80:
        er_code = rcv_hdr[1]      # pick up ERROR CODE
        er_crc  = ser.read( 1 )   # get CRCL
        rcv_msg = rcv_hdr + er_crc
    
    else:
        bcnt = rcv_hdr[2]             # get data byte count
        rcv_data = ser.read( bcnt + 2 )
        er_code = 0
        rcv_msg  = rcv_hdr + rcv_data
    
    return rcv_msg

#  Send Command to SLT5006
def send_cmd( tx_cmd ):
    ser.write( tx_cmd )
    return

# Send Start command for SL5006 
def Tx_start_cmd( ):
    f_code = 2       # wrtcmd
    register = 7     # SNSR_CTR_reg
    B_size = 1
    data_01 = 1
    data = bytes([ f_code, register, B_size, data_01 ])      # b"\x02\x07\x01\x01"
    offset = 0
    length = 4
    # Calculate CRC16_Modbus
    crc16mb = crc16_Modbus(data, offset, length)
    CRC16U = crc16mb[0]
    CRC16L = crc16mb[1]
    tx_cmd = bytes([ f_code, register, B_size, data_01, CRC16U, CRC16L ])
    # Send Command and receive response
    send_cmd( tx_cmd )
    rcv_resp = receive_response()
    # Check message from SLT5006
    msg_length = len( rcv_resp)
    
    if msg_length == 4 :
        print( "Error  Code :", rcv_resp[1] )
        print( rcv_resp)
        rtn_code0 = -1
    else:
        rtn_code0 = 1
        #print( rcv_resp )
        #print(" Samling Start")

    return rtn_code0

# Check Sensor Status. If Data_Ready then Retuen else continue to check the status. 
def Setup_status_check_cmd( ):
    f_code = 1      #readcmd
    register = 8    #SNSR_STS_reg
    B_size = 1
    data = bytes([ f_code, register, B_size ])      # b"\x01\x08\x01"
    offset = 0
    length = 3
    # Calculate CRC16_Modbus
    crc16mb = crc16_Modbus(data, offset, length)
    CRC16U = crc16mb[0]
    CRC16L = crc16mb[1]
    tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])
    return tx_cmd

def Check_sensor_status( schk_cmd ):
    send_cmd( schk_cmd )
    rcv_resp = receive_response()
    msg_length = len( rcv_resp)

    if msg_length == 4 :
        print( "Error  Code :", rcv_resp[1] )
        print( rcv_resp)
        rtn_code0 = -1
    
    if rcv_resp[3] == 0x01:
        rtn_code0 = 1
        print( "Data Ready")
        #print( rcv_resp )
        
    if rcv_resp[3] == 0:
        rtn_code0 = 0
        print( "Data not Ready")
        #print( rcv_resp )
    
    return rtn_code0    

# Get data from Soil Sensor. 
def Get_sensor_data( ):
    # print("Get Data")
    f_code = 1         #readcmd
    register = 19      # 0x13TEMP_register
    B_size = 16  # data msg length = 16 bytes
    data = bytes([ f_code, register, B_size ])      # b"\x01\x08\x01"
    offset = 0
    length = 3
    # Calculate CRC16_Modbus
    crc16mb = crc16_Modbus(data, offset, length)
    CRC16U = crc16mb[0]
    CRC16L = crc16mb[1]
    tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])

    send_cmd( tx_cmd )
    # wait response from Soil Sensor
    rcv_resp = receive_response()
    
    msg_length = len( rcv_resp)
    # print( "Rcv Length =", msg_length)
    if msg_length == 4 :
        print( "Error  Code :", rcv_resp[1] )
        print( rcv_resp)
        rtn_code0 = -1
    else:      #if msg_length == 18:
        rtn_code0 = 1
        #print( "Recive Data :", rcv_resp  )
        #
        temp = (( rcv_resp[4] << 8 ) | ( rcv_resp[3] & 0x00ff ))
        if temp >= 0x800:
            temp_dt = ((0x1000 - temp ) * -1 ) * 0.0625
        else:
            temp_dt = temp * 0.0625
        print( "Temparature :", temp_dt )
        #
        ec_bulk = (( rcv_resp[6] << 8 ) | ( rcv_resp[5] & 0x00ff )) * 0.001
        print( "EC_BULK :", ec_bulk )
        vwc = (( rcv_resp[10] << 8 ) | ( rcv_resp[9] & 0x00ff )) * 0.1
        print( "VWC :", vwc )
        ec_pore = (( rcv_resp[16] << 8 ) | ( rcv_resp[15] & 0x00ff )) * 0.001
        print( "EC_PORE :", ec_pore )
        comment = "new data"
        #   set data to Excel file

        # Open Excel file and select Shhet
        wb = openpyxl.load_workbook('sl5006_data_01.xlsx')
        sheet = wb['Sheet1']    # select default Sheet name.
        #  Pick up current bottom row number and get target row number
        nextrow = ( sheet['G3'].value ) + 1
        print("Next row :", nextrow )

        today = datetime.date.today()
        t_now = datetime.datetime.now().time()
        sheet.cell( row = nextrow, column = 1 ).value = today
        sheet.cell( row = nextrow, column = 2 ).value = t_now
        sheet.cell( row = nextrow, column = 3 ).value = temp_dt
        sheet.cell( row = nextrow, column = 4 ).value = ec_bulk
        sheet.cell( row = nextrow, column = 5 ).value = vwc
        sheet.cell( row = nextrow, column = 6 ).value = ec_pore
        sheet.cell( row = nextrow, column = 7 ).value = comment 

        if nextrow >= 87:   # 3data/day for 31days
            nextrow = 5     # set Topline

        sheet['G3'].value = nextrow

        # if set data into Topline then set F/W version and serial no.
        if nextrow == 6:
            fw_rev = Get_fwrev()
            ser_no = Get_sno()
            sheet['B2'].value = fw_rev
            sheet['B3'].value = ser_no

        # Close xlsx
        wb.save('sl5006_data_01.xlsx')
        wb.close()

    return rtn_code0


# Get Farmware version and Serial numberr. 
def Get_fwrev( ):
    # print("Get F/W Rev. and Serial no.")
    f_code = 1      # readcmd
    register = 0    # FW_ver_reg
    B_size = 3      # F/W ver 3bytes
    data = bytes([ f_code, register, B_size ])      # b"\x01\x00\x07"
    offset = 0
    length = 3
    # Calculate CRC16_Modbus
    crc16mb = crc16_Modbus(data, offset, length)
    CRC16U = crc16mb[0]
    CRC16L = crc16mb[1]
    tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])

    send_cmd( tx_cmd )
    # wait response from Soil Sensor
    rcv_resp = receive_response()
    
    msg_length = len( rcv_resp)
    # print( "Rcv Length =", msg_length)
    if msg_length == 4 :
        print( "Error  Code :", rcv_resp[1] )
        print( rcv_resp)
        rtn_code0 = -1
    else:      #if msg_length == 18:
        rtn_code0 = 1
        #print( "Recive Data :", rcv_resp  )
        #
        fw_ver_mjr = rcv_resp[3]
        fw_ver_min = rcv_resp[4]
        fw_rev = rcv_resp[5]
        fw_ver = str(fw_ver_mjr) + "." + str(fw_ver_min) + "." + str( fw_rev)
        #print( "F/W version :",fw_ver)
    return fw_ver

# Get Serial numberr. 
def Get_sno( ):
    # print("Get F/W Rev. and Serial no.")
    f_code = 1      # readcmd
    register = 3    # SER_No_reg
    B_size = 4      # Serial no. 4bytes
    data = bytes([ f_code, register, B_size ])      # b"\x01\x00\x07"
    offset = 0
    length = 3
    # Calculate CRC16_Modbus
    crc16mb = crc16_Modbus(data, offset, length)
    CRC16U = crc16mb[0]
    CRC16L = crc16mb[1]
    tx_cmd = bytes([ f_code, register, B_size, CRC16U, CRC16L ])

    send_cmd( tx_cmd )
    # wait response from Soil Sensor
    rcv_resp = receive_response()
    
    msg_length = len( rcv_resp)
    # print( "Rcv Length =", msg_length)
    if msg_length == 4 :
        print( "Error  Code :", rcv_resp[1] )
        print( rcv_resp)
        rtn_code0 = -1
    else:      #if msg_length == 18:
        rtn_code0 = 1
        #print( "Recive Data :", rcv_resp  )
        #
        ser_no = ((rcv_resp[6] << 24 ) | ( rcv_resp[5] << 16 ) | ( rcv_resp[4] << 8 ) | ( rcv_resp[3]))
        #print( "Serial No.:", ser_no)
    return ser_no
   


# start here

print( "Soil Data mesuring program V. 1R0")
fw_rev = Get_fwrev()
print( "F/W version :",fw_rev)
ser_no = Get_sno()
print("Serial No. :", ser_no)

print( "Sampling Start and wait Ready status")
rtn_cd = Tx_start_cmd()
if rtn_cd == 1:
    print("Samling start successfully ")

schk_cmd = Setup_status_check_cmd( )  #Set up Status Check command
wait_sts = 1

while wait_sts == 1 :
    rtn_cd = Check_sensor_status( schk_cmd )  #send command
    if rtn_cd == 0:
        wait_sts = 1
    
    else:
        wait_sts = 0

    time.sleep( 2 )      #Wait 2sec.
    
print("Get Data command")
rtn_cd = Get_sensor_data()
print("End") 


ser.close()


  ※ e2eStore会員ログインが必要です。