diff --git a/README.md b/README.md index beca0ba..a007667 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ supported. | GlucoRx | NexusQ | `td42xx` | [construct] [pyserial]² [hidapi] | | Menarini | GlucoMen Nexus | `td42xx` | [construct] [pyserial]² [hidapi] | | Aktivmed | GlucoCheck XL | `td42xx` | [construct] [pyserial]² [hidapi] | +| Ascensia | Contour Next | `contournext` | [construct] [hidapi]‡ | | Ascensia | ContourUSB | `contourusb` | [construct] [hidapi]‡ | | Menarini | GlucoMen areo³ | `glucomenareo` | [pyserial] [crcmod] | diff --git a/glucometerutils/drivers/contournext.py b/glucometerutils/drivers/contournext.py new file mode 100644 index 0000000..dd63ca3 --- /dev/null +++ b/glucometerutils/drivers/contournext.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: © 2026 The glucometerutils Authors +# SPDX-License-Identifier: MIT +"""Driver for Contour Next devices. + +Supported features: + - get readings (blood glucose), including comments; + - get date and time; + - get serial number and software version; + - get device info (e.g. unit) + +Expected device path: /dev/hidraw4 or similar HID device. Optional when using +HIDAPI. + +Further information on the device protocol can be found at + +http://protocols.ascensia.com/Programming-Guide.aspx + +""" + +import datetime +from collections.abc import Generator +from typing import NoReturn, Optional + +from glucometerutils import common +from glucometerutils.support import contourusb + + +def _extract_timestamp(parsed_record: dict[str, str]): + """Extract the timestamp from a parsed record. + + This leverages the fact that all the reading records have the same base structure. + """ + # Pad with zeros for missing time components + datetime_str = parsed_record["datetime"].ljust(14, '0') + return datetime.datetime.strptime(datetime_str, '%Y%m%d%H%M%S') + + +class Device(contourusb.ContourHidDevice): + """Glucometer driver for Contour Next devices.""" + + def __init__(self, device: Optional[str]) -> None: + super().__init__( + (0x1A79, 0x7900), + device, + header_record_re=contourusb._HEADER_RECORD_RE_NEXT, + ) + + def get_meter_info(self) -> common.MeterInfo: + self._get_info_record() + return common.MeterInfo( + "Contour Next", + serial_number=self._get_serial_number(), + version_info=("Meter versions: " + self._get_version(),), + native_unit=self.get_glucose_unit(), + ) + + def get_glucose_unit(self) -> common.Unit: + if self._get_glucose_unit() == "0": + return common.Unit.MG_DL + else: + return common.Unit.MMOL_L + + def get_readings(self) -> Generator[common.AnyReading, None, None]: + """ + Get reading dump from download data mode(all readings stored) + This meter supports only blood samples + """ + for parsed_record in self._get_multirecord(): + yield common.GlucoseReading( + _extract_timestamp(parsed_record), + int(parsed_record["value"]), + comment=parsed_record["markers"], + measure_method=common.MeasurementMethod.BLOOD_SAMPLE, + ) + + def get_serial_number(self) -> NoReturn: + raise NotImplementedError + + def _set_device_datetime(self, date: datetime.datetime) -> NoReturn: + raise NotImplementedError + + def zero_log(self) -> NoReturn: + raise NotImplementedError diff --git a/glucometerutils/drivers/contourusb.py b/glucometerutils/drivers/contourusb.py index 578a1a6..b7b7a2f 100644 --- a/glucometerutils/drivers/contourusb.py +++ b/glucometerutils/drivers/contourusb.py @@ -32,23 +32,20 @@ def _extract_timestamp(parsed_record: dict[str, str]): This leverages the fact that all the reading records have the same base structure. """ - datetime_str = parsed_record["datetime"] - - return datetime.datetime( - int(datetime_str[0:4]), # year - int(datetime_str[4:6]), # month - int(datetime_str[6:8]), # day - int(datetime_str[8:10]), # hour - int(datetime_str[10:12]), # minute - 0, - ) + # Pad with zeros for missing time components + datetime_str = parsed_record["datetime"].ljust(14, '0') + return datetime.datetime.strptime(datetime_str, '%Y%m%d%H%M%S') class Device(contourusb.ContourHidDevice): - """Glucometer driver for Contour devices.""" + """Glucometer driver for Contour USB devices.""" def __init__(self, device: Optional[str]) -> None: - super().__init__((0x1A79, 0x6002), device) + super().__init__( + (0x1A79, 0x6002), + device, + header_record_re=contourusb._HEADER_RECORD_RE_USB, + ) def get_meter_info(self) -> common.MeterInfo: self._get_info_record() diff --git a/glucometerutils/drivers/tests/test_contournext.py b/glucometerutils/drivers/tests/test_contournext.py new file mode 100644 index 0000000..b3c3b56 --- /dev/null +++ b/glucometerutils/drivers/tests/test_contournext.py @@ -0,0 +1,95 @@ +# SPDX-FileCopyrightText: 2026 The glucometerutils Authors +# +# SPDX-License-Identifier: MIT + +"""Tests for the Contour Next protocol support.""" + +# pylint: disable=protected-access,missing-docstring + +import datetime +from unittest.mock import Mock + +from absl.testing import absltest + +from glucometerutils.support import contourusb + + +class TestContourNext(absltest.TestCase): + header_record = b"\x04\x021H|\\^&||0t4cvJ|Contour7900^02.13\\01.00\\02.40^7901H33A1578|A=0^C=6^R=0^S=0^U=0^V=10600^X=070070180130^a=0^J=0|25|||||P|1|20260208104218|\r\x171A\r\n\x05" + + def setUp(self): + super().setUp() + self.mock_dev = Mock() + self.mock_dev._header_record_re = contourusb._HEADER_RECORD_RE_NEXT + + def test_get_datetime(self): + self.mock_dev.datetime = "20260208104218" + self.assertEqual( + datetime.datetime(2026, 2, 8, 10, 42, 18), + contourusb.ContourHidDevice.get_datetime(self.mock_dev), + ) + + def test_RECORD_FORMAT_match(self): + header_record_decoded = self.header_record.decode() + stx = header_record_decoded.find("\x02") + + result = contourusb._RECORD_FORMAT.match(header_record_decoded[stx:]).group("text") + + self.assertEqual( + "H|\\^&||0t4cvJ|Contour7900^02.13\\01.00\\02.40^7901H33A1578|A=0^C=6^R=0^S=0^U=0^V=10600^X=070070180130^a=0^J=0|25|||||P|1|20260208104218|", + result, + ) + + def test_parse_header_record(self): + header_record_decoded = self.header_record.decode() + stx = header_record_decoded.find("\x02") + + result = contourusb._RECORD_FORMAT.match(header_record_decoded[stx:]).group("text") + contourusb.ContourHidDevice.parse_header_record(self.mock_dev, result) + + self.assertEqual(self.mock_dev.field_del, "\\") + self.assertEqual(self.mock_dev.repeat_del, "^") + self.assertEqual(self.mock_dev.component_del, "&") + self.assertEqual(self.mock_dev.escape_del, "|") + + self.assertEqual(self.mock_dev.product_code, "Contour7900") + + self.assertEqual(self.mock_dev.dig_ver, "02.13") + self.assertEqual(self.mock_dev.anlg_ver, "01.00") + self.assertEqual(self.mock_dev.agp_ver, "02.40") + self.assertEqual(self.mock_dev.serial_num, "7901H33A1578") + + self.assertEqual(self.mock_dev.res_marking, "0") + self.assertEqual(self.mock_dev.config_bits, "6") + + self.assertEqual(self.mock_dev.ref_method, "0") + self.assertEqual(self.mock_dev.internal, "0") + self.assertEqual(self.mock_dev.unit, "0") + self.assertEqual(self.mock_dev.lo_bound, "10") + self.assertEqual(self.mock_dev.hi_bound, "600") + + self.assertEqual(self.mock_dev.post_food_low, "070") + self.assertEqual(self.mock_dev.pre_food_low, "070") + + self.assertEqual(self.mock_dev.post_food_high, "180") + self.assertEqual(self.mock_dev.pre_food_high, "130") + + self.assertEqual(self.mock_dev.total, "25") + self.assertEqual(self.mock_dev.spec_ver, "1") + + self.assertEqual(self.mock_dev.datetime, "20260208104218") + + def test_parse_result_record(self): + result_record = "R|3|^^^Glucose|126|mg/dL^P||T0||20260125085519" + result_dict = contourusb.ContourHidDevice.parse_result_record( + self.mock_dev, result_record + ) + + self.assertEqual(result_dict["record_type"], "R") + self.assertEqual(result_dict["seq_num"], "3") + self.assertEqual(result_dict["test_id"], "Glucose") + self.assertEqual(result_dict["value"], "126") + self.assertEqual(result_dict["unit"], "mg/dL") + self.assertEqual(result_dict["ref_method"], "P") + self.assertEqual(result_dict["markers"], "T0") + self.assertEqual(result_dict["datetime"], "20260125085519") diff --git a/glucometerutils/drivers/tests/test_contourusb.py b/glucometerutils/drivers/tests/test_contourusb.py index 107b4d4..50f80f1 100644 --- a/glucometerutils/drivers/tests/test_contourusb.py +++ b/glucometerutils/drivers/tests/test_contourusb.py @@ -16,15 +16,19 @@ class TestContourUSB(absltest.TestCase): header_record = b"\x04\x021H|\\^&||7w3LBL|Bayer7390^01.24\\01.04\\09.02.20^7390-2336773^7403-|A=1^C=63^G=1^I=0200^R=0^S=1^U=0^V=10600^X=070070070070180130150250^Y=360126090050099050300089^Z=1|1714||||||1|201909221304\r\x17D7\r\n\x05" - mock_dev = Mock() + def setUp(self): + super().setUp() + self.mock_dev = Mock() + self.mock_dev._header_record_re = contourusb._HEADER_RECORD_RE_USB def test_get_datetime(self): import datetime - self.datetime = "201908071315" # returned by + # datetime is padded to 14 chars (YYYYMMDDHHMMSS) by parse_header_record + self.mock_dev.datetime = "20190807131500" self.assertEqual( - datetime.datetime(2019, 8, 7, 13, 15), - contourusb.ContourHidDevice.get_datetime(self), + datetime.datetime(2019, 8, 7, 13, 15, 0), + contourusb.ContourHidDevice.get_datetime(self.mock_dev), ) def test_RECORD_FORMAT_match(self): @@ -92,7 +96,8 @@ def test_parse_header_record(self): self.assertEqual(self.mock_dev.total, "1714") self.assertEqual(self.mock_dev.spec_ver, "1") - self.assertEqual(self.mock_dev.datetime, "201909221304") + # datetime is padded to 14 chars (YYYYMMDDHHMMSS) + self.assertEqual(self.mock_dev.datetime, "20190922130400") # TO-DO checksum and checkframe unit tests diff --git a/glucometerutils/support/contourusb.py b/glucometerutils/support/contourusb.py index 259e8d1..af3fe00 100644 --- a/glucometerutils/support/contourusb.py +++ b/glucometerutils/support/contourusb.py @@ -14,6 +14,7 @@ import datetime import enum +import logging import re from collections.abc import Generator from typing import Optional @@ -21,8 +22,10 @@ from glucometerutils import driver from glucometerutils.support import hiddevice +logger = logging.getLogger(__name__) + # regexr.com/4k6jb -_HEADER_RECORD_RE = re.compile( +_HEADER_RECORD_RE_USB = re.compile( "^(?P[a-zA-Z])\\|(?P.)(?P.)" "(?P.)(?P.)\\|\\w*\\|(?P\\w+)" "\\^(?P[0-9]{2}\\.[0-9]{2})\\\\(?P[0-9]{2}\\.[0-9]{2})" @@ -44,10 +47,26 @@ "(?P[0-9]+)\\|(?P[0-9]+)" ) +_HEADER_RECORD_RE_NEXT = re.compile( + "^(?P[a-zA-Z])\\|(?P.)(?P.)" + "(?P.)(?P.)\\|\\w*\\|(?P\\w+)" + "\\^(?P[0-9]{2}\\.[0-9]{2})\\\\(?P[0-9]{2}\\.[0-9]{2})" + "\\\\(?P[0-9]{2}\\.[0-9]{2})" + "\\^(?P(\\w|-)+)\\|" + "A=(?P[0-9])\\^C=(?P[0-9]+)\\^R=(?P[0-9]+)\\" + "^S=(?P[0-9]+)\\^U=(?P[0-9]+)\\" + "^V=(?P[0-9]{2})(?P[0-9]{3})\\" + "^X=(?P[0-9]{3})(?P[0-9]{3})" + "(?P[0-9]{3})(?P[0-9]{3})" + "\\^a=(?P[0-9])\\^J=(?P[0-9])\\|" + "(?P[0-9]*)\\|\\|\\|\\|\\|(?P[P])\\|" + "(?P[0-9]+)\\|(?P[0-9]+)\\|" +) + _RESULT_RECORD_RE = re.compile( "^(?P[a-zA-Z])\\|(?P[0-9]+)\\|\\w*\\^\\w*\\^\\w*\\" "^(?P\\w+)\\|(?P[0-9]+)\\|(?P\\w+\\/\\w+)\\^" - "(?P[BPD])\\|\\|(?P[>[BPD])\\|\\|(?P[>[0-9]+)" ) @@ -81,9 +100,15 @@ class ContourHidDevice(driver.GlucometerDevice): currecno: Optional[int] = None - def __init__(self, usb_ids: tuple[int, int], device_path: Optional[str]) -> None: + def __init__( + self, + usb_ids: tuple[int, int], + device_path: Optional[str], + header_record_re: re.Pattern[str], + ) -> None: super().__init__(device_path) self._hid_session = hiddevice.HidSession(usb_ids, device_path) + self._header_record_re = header_record_re def read(self, r_size=blocksize): result = [] @@ -105,63 +130,14 @@ def write(self, data): self._hid_session.write(data) - USB_VENDOR_ID: int = 0x1A79 # Bayer Health Care LLC Contour - USB_PRODUCT_ID: int = 0x6002 - def parse_header_record(self, text): - header = _HEADER_RECORD_RE.search(text) - - self.field_del = header.group("field_del") - self.repeat_del = header.group("repeat_del") - self.component_del = header.group("component_del") - self.escape_del = header.group("escape_del") - - self.product_code = header.group("product_code") - self.dig_ver = header.group("dig_ver") - self.anlg_ver = header.group("anlg_ver") - self.agp_ver = header.group("agp_ver") - - self.serial_num = header.group("serial_num") - self.sku_id = header.group("sku_id") - self.res_marking = header.group("res_marking") - self.config_bits = header.group("config_bits") - self.lang = header.group("lang") - self.interv = header.group("interv") - self.ref_method = header.group("ref_method") - self.internal = header.group("internal") - - # U limit - self.unit = header.group("unit") - self.lo_bound = header.group("lo_bound") - self.hi_bound = header.group("hi_bound") - - # X field - self.hypo_limit = header.group("hypo_limit") - self.overall_low = header.group("overall_low") - self.pre_food_low = header.group("pre_food_low") - self.post_food_low = header.group("post_food_low") - self.overall_high = header.group("overall_high") - self.pre_food_high = header.group("pre_food_high") - self.post_food_high = header.group("post_food_high") - self.hyper_limit = header.group("hyper_limit") - - # Y field - self.upp_hyper = header.group("upp_hyper") - self.low_hyper = header.group("low_hyper") - self.upp_hypo = header.group("upp_hypo") - self.low_hypo = header.group("low_hypo") - self.upp_low_target = header.group("upp_low_target") - self.low_low_target = header.group("low_low_target") - self.upp_hi_target = header.group("upp_hi_target") - self.low_hi_target = header.group("low_hi_target") - - # Z field - self.trends = header.group("trends") - - self.total = header.group("total") - self.spec_ver = header.group("spec_ver") - # Datetime string in YYYYMMDDHHMM format - self.datetime = header.group("datetime") + header = self._header_record_re.search(text) + if not header: + raise FrameError("Couldn't parse header record", text) + for key, value in header.groupdict().items(): + setattr(self, key, value) + # Harmonize datetime string to YYYYMMDDHHMMSS format + self.datetime = self.datetime.ljust(14, '0') def checksum(self, text): """ @@ -205,7 +181,7 @@ def checkframe(self, frame) -> Optional[str]: def connect(self): """Connecting the device, nothing to be done. - All process is hadled by hiddevice + All process is handled by hiddevice """ pass @@ -227,13 +203,9 @@ def _get_info_record(self): else: pass - except FrameError as e: - print("Frame error") - raise e - - except Exception as e: - print("Uknown error occured") - raise e + except FrameError: + logger.error("Frame error") + raise def disconnect(self): """Disconnect the device, nothing to be done.""" @@ -255,15 +227,7 @@ def _get_glucose_unit(self) -> str: return self.unit def get_datetime(self) -> datetime.datetime: - datetime_str = self.datetime - return datetime.datetime( - int(datetime_str[0:4]), # year - int(datetime_str[4:6]), # month - int(datetime_str[6:8]), # day - int(datetime_str[8:10]), # hour - int(datetime_str[10:12]), # minute - 0, - ) + return datetime.datetime.strptime(self.datetime, '%Y%m%d%H%M%S') def sync(self) -> Generator[str, None, None]: """ @@ -272,49 +236,54 @@ def sync(self) -> Generator[str, None, None]: More info: https://bitbucket.org/iko/glucodump/src/default/ """ self.state = Mode.ESTABLISH - try: - tometer = "\x04" + tometer = "\x04" + result = None + foo = 0 + while True: + self.write(tometer) + if result is not None and self.state == Mode.DATA: + yield result result = None - foo = 0 - while True: - self.write(tometer) - if result is not None and self.state == Mode.DATA: - yield result - result = None - data_bytes = self.read() - data = data_bytes.decode() - - if self.state == Mode.ESTABLISH: - if data_bytes[-1] == 15: - # got a , send + data_bytes = self.read() + data = data_bytes.decode() + + if self.state == Mode.ESTABLISH: + if data_bytes[-1] == 15: + # got a , send + tometer = chr(foo) + foo += 1 + foo %= 256 + continue + if data_bytes[-1] == 5: + # got an , send + tometer = "\x06" + self.currecno = None + continue + if self.state == Mode.DATA: + if data_bytes[-1] == 4: + # got an , done + self.state = Mode.PRECOMMAND + break + stx = data.find("\x02") + if stx != -1: + # got , parse frame + try: + result = self.checkframe(data[stx:]) + if result == "L|1||N": + # got terminator record from Contour Next, send tometer = chr(foo) foo += 1 foo %= 256 - continue - if data_bytes[-1] == 5: - # got an , send - tometer = "\x06" - self.currecno = None - continue - if self.state == Mode.DATA: - if data_bytes[-1] == 4: - # got an , done self.state = Mode.PRECOMMAND break - stx = data.find("\x02") - if stx != -1: - # got , parse frame - try: - result = self.checkframe(data[stx:]) + else: tometer = "\x06" self.state = Mode.DATA - except FrameError: - tometer = "\x15" # Couldn't parse, - else: - # Got something we don't understand, it - tometer = "\x15" - except Exception as e: - raise e + except FrameError: + tometer = "\x15" # Couldn't parse, + else: + # Got something we don't understand, it + tometer = "\x15" def parse_result_record(self, text: str) -> dict[str, str]: result = _RESULT_RECORD_RE.search(text) diff --git a/setup.py b/setup.py index 88c17e3..c631927 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,7 @@ # listed as mandatory for the feature. "accucheck_reports": [], "contourusb": ["construct", "hidapi"], + "contournext": ["construct", "hidapi"], "fsfreedomlite": ["pyserial"], "fsinsulinx": ["freestyle-hid>=1.0.2"], "fslibre": ["freestyle-hid>=1.0.2"], diff --git a/udev/69-glucometerutils.rules b/udev/69-glucometerutils.rules index 1ef2f58..bea3eaf 100644 --- a/udev/69-glucometerutils.rules +++ b/udev/69-glucometerutils.rules @@ -28,5 +28,9 @@ ATTRS{idVendor}=="10c4", ATTRS{idProduct}=="ea60", TAG+="uaccess" ATTRS{idVendor}=="1a61", TAG+="uaccess" +# Ascensia Contour Devices + +ATTRS{idVendor}=="1a79", TAG+="uaccess" + LABEL="glucometerutils_rules_end"