Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
# AliDip2BK
# BKP-LHC Client

Initial Repository based on work from @iclegrand in repository: https://github.com/iclegrand/AliDip2BK

Collect selected Info from the CERN DIP system (LHC & ALICE -DCS) and publish them into the Bookkeeping/InfoLogger systems
The BKP-LHC Client is a java based application which uses the CERN DIP `jar` dependency to consume events from desired tracks. These events are then either:
Comment thread
graduta marked this conversation as resolved.
- published on O2 Kafka Topics to be consumed further by O2 applications (e.g. ECS)
- updates the O2 Bookkeeping application via their HTTP endpoints.

A detailed description for this project is provided by Roberto in this document:
https://codimd.web.cern.ch/G0TSXqA1R8iPqWw2w2wuew


This program requires java 11 on a 64 bit system
(this is a constrain from the DIP library)

To test the java version run
### Requirements
- java 11 on a 64 bit system (this is a constrain from the DIP library)
Comment thread
graduta marked this conversation as resolved.
- to test the java version run
```
java -version
```

The run configuration is defined in the AliDip2BK.properties file.

To run the program :

sh runAliDip2BK.sh
### Configuration
The run configuration is defined in the `AliDip2BK.properties` file.

When the the program is stopped, it enters into the shutdown mode and it will
unsubscribe to the DIP data providers will wait to process the DipData queue
and saves the state of the fills and runs.
### Published Events
Currently the BKP-LHC-Client publishes on Kafka (topic: "dip.lhc.beam_mode") events for the start and end of stable beams in the format of `Ev_BeamModeEvent`. The proto file's source of truth is within the [Control Repository](https://github.com/AliceO2Group/Control/blob/master/common/protos/events.proto)
Comment thread
graduta marked this conversation as resolved.

### Bookkeeping Updates
- TBC
24 changes: 18 additions & 6 deletions src/alice/dip/AliDip2BK.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/*************
* cil
**************/

/*
* Main Class
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip;
Expand All @@ -18,6 +22,8 @@
import java.util.Date;
import java.util.Properties;

import alice.dip.kafka.BeamModeEventsKafkaProducer;

public class AliDip2BK implements Runnable {
public static String Version = "2.1.2 22-Jul-2025";
public static String DNSnode = "dipnsdev.cern.ch";
Expand Down Expand Up @@ -52,6 +58,7 @@ public class AliDip2BK implements Runnable {
BookkeepingClient bookkeepingClient;
StartOfRunKafkaConsumer kcs;
EndOfRunKafkaConsumer kce;
BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;

public AliDip2BK() {
startDate = (new Date()).getTime();
Expand Down Expand Up @@ -83,6 +90,9 @@ public AliDip2BK() {

kce = new EndOfRunKafkaConsumer(dipMessagesProcessor);

beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers);
dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer);

shutdownProc();

Thread t = new Thread(this);
Expand Down Expand Up @@ -145,6 +155,8 @@ public void run() {
}
dipMessagesProcessor.saveState();
writeStat("AliDip2BK.stat", true);
beamModeEventsKafkaProducer.close();
log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed");
Comment thread
graduta marked this conversation as resolved.
}
});
}
Expand Down
33 changes: 29 additions & 4 deletions src/alice/dip/DipMessagesProcessor.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
/*************
* cil
**************/
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip;

Expand All @@ -19,6 +28,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import alice.dip.kafka.BeamModeEventsKafkaProducer;
import cern.dip.BadParameter;
import cern.dip.DipData;
import cern.dip.DipTimestamp;
Expand Down Expand Up @@ -46,19 +56,28 @@ public class DipMessagesProcessor implements Runnable {
private BlockingQueue<MessageItem> outputQueue = new ArrayBlockingQueue<MessageItem>(100);

private final LuminosityManager luminosityManager;
private BeamModeEventsKafkaProducer beamModeEventsKafkaProducer;
Comment thread
graduta marked this conversation as resolved.
Outdated

public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) {

this.bookkeepingClient = bookkeepingClient;
this.luminosityManager = luminosityManager;

this.beamModeEventsKafkaProducer = null;
Thread t = new Thread(this);
t.start();
Comment thread
graduta marked this conversation as resolved.

currentAlice = new AliceInfoObj();
loadState();
}

/**
* Setter of events producer
* @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events
*/
public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) {
this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer;
}

/*
* This method is used for receiving DipData messages from the Dip Client
*/
Expand Down Expand Up @@ -399,6 +418,9 @@ public void newSafeMode(long time, int val) {
} else {

currentFill.setBeamMode(time, "LOST BEAMS");
if (beamModeEventsKafkaProducer != null) {
beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time);
}
AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! ");
}

Expand Down Expand Up @@ -580,6 +602,9 @@ public void newBeamMode(long date, String BeamMode) {
);
bookkeepingClient.updateLhcFill(currentFill);
saveState();
if (beamModeEventsKafkaProducer != null) {
beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date);
}
} else {
currentFill.endedTime = date;
bookkeepingClient.updateLhcFill(currentFill);
Expand Down
65 changes: 65 additions & 0 deletions src/alice/dip/kafka/BeamModeEventsKafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip.kafka;

import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;

import alice.dip.AliDip2BK;
import alice.dip.LhcInfoObj;
import alice.dip.kafka.events.Events;
import alice.dip.kafka.events.Common;
Comment thread
graduta marked this conversation as resolved.
Outdated

/**
* Kafka producer for LHC Beam Mode events, serialized using Protocol Buffers.
*/
public class BeamModeEventsKafkaProducer extends KafkaProducerInterface<Integer, byte[]> {
public static String KAFKA_PRODUCER_TOPIC_DIP = "dip.lhc.beam_mode";
Comment thread
graduta marked this conversation as resolved.
Outdated

/**
* Constructor to create a BeamModeEventsKafkaProducer
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
*/
public BeamModeEventsKafkaProducer(String bootstrapServers) {
super(bootstrapServers, KAFKA_PRODUCER_TOPIC_DIP, new IntegerSerializer(), new ByteArraySerializer());
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Initialized producer for topic: " + KAFKA_PRODUCER_TOPIC_DIP);
}

/**
* Given a fill number for partitioning, a LhcInfoObj containing fill information,
* and a timestamp, creates and sends a proto serialized Beam Mode Event to the Kafka topic.
* @param fillNumber - fill number to be used for partition to ensure ordering
* @param fill - LhcInfoObj containing fill information
* @param timestamp - event timestamp at which the beam mode change event was received from DIP
*/
public void sendEvent(Integer fillNumber, LhcInfoObj fill, long timestamp) {
Common.BeamInfo beamInfo = Common.BeamInfo.newBuilder()
.setStableBeamsStart(fill.getStableBeamStart())
.setStableBeamsEnd(fill.getStableBeamStop())
.setFillNumber(fill.fillNo)
.setFillingSchemeName(fill.LHCFillingSchemeName)
.setBeamMode(Common.BeamMode.valueOf(fill.getBeamMode()))
.setBeamType(fill.beamType)
Comment thread
graduta marked this conversation as resolved.
.build();
Comment thread
graduta marked this conversation as resolved.
Comment thread
graduta marked this conversation as resolved.

Events.Ev_BeamModeEvent event = Events.Ev_BeamModeEvent.newBuilder()
.setTimestamp(timestamp)
.setBeamInfo(beamInfo)
.build();
byte[] value = event.toByteArray();

send(fillNumber, value);
AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Sent Beam Mode event for fill " + fill.fillNo + " with mode " + fill.getBeamMode() + " at timestamp " + timestamp);
}
}
64 changes: 64 additions & 0 deletions src/alice/dip/kafka/KafkaProducerInterface.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

package alice.dip.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Properties;

/**
* Generic Kafka Producer interface to send messages to a specified topic.
* @param <K> - Type of the message key (to be used for partitioning)
* @param <V> - Type of the message value (payload)
*/
public class KafkaProducerInterface<K, V> implements AutoCloseable {
private final KafkaProducer<K, V> producer;
private final String topic;

/**
* Constructor to create a KafkaProducerInterface
* @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port
* @param topic - Kafka topic to which messages will be sent
* @param keySerializer - Kafka supported serializer for the message key
* @param valueSerializer - Kafka supported serializer for the message value
*/
public KafkaProducerInterface(String bootstrapServers, String topic, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.topic = topic;
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
Comment thread
graduta marked this conversation as resolved.
this.producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
}

/**
* Send a message to the configured Kafka topic
* @param key - message key for partitioning
* @param value - message value (payload)
*/
public void send(K key, V value) {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
Comment thread
graduta marked this conversation as resolved.
Outdated
}

/**
* Method to close the Kafka producer instance
*/
@Override
public void close() {
producer.close();
}
}
Loading