编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

聊聊openmessaging-java(聊聊日常电视剧免费观看西瓜视频)

wxchong 2024-08-19 23:47:13 开源技术 11 ℃ 0 评论

本文主要研究一下openmessaging-java

maven

<dependency>

<groupId>io.openmessaging</groupId>

<artifactId>openmessaging-api</artifactId>

<version>0.3.1-alpha</version>

</dependency>

maven最新的是0.3.1-alpha,这里直接用源码的0.3.2-alpha-SNAPSHOT

producer

Producer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java

/**

* A {@code Producer} is a simple object used to send messages on behalf

* of a {@code MessagingAccessPoint}. An instance of {@code Producer} is

* created by calling the {@link MessagingAccessPoint#createProducer()} method.

* <p>

* It provides various {@code send} methods to send a message to a specified destination,

* which is a {@code Queue} in OMS.

* <p>

* {@link Producer#send(Message)} means send a message to the destination synchronously,

* the calling thread will block until the send request complete.

* <p>

* {@link Producer#sendAsync(Message)} means send a message to the destination asynchronously,

* the calling thread won't block and will return immediately. Since the send call is asynchronous

* it returns a {@link Future} for the send result.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface Producer extends MessageFactory, ServiceLifecycle {

/**

* Returns the attributes of this {@code Producer} instance.

* Changes to the return {@code KeyValue} are not reflected in physical {@code Producer}.

* <p>

* There are some standard attributes defined by OMS for {@code Producer}:

* <ul>

* <li> {@link OMSBuiltinKeys#PRODUCER_ID}, the unique producer id for a producer instance.

* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code Producer}.

* </ul>

*

* @return the attributes

*/

KeyValue attributes();

/**

* Sends a message to the specified destination synchronously, the destination should be preset to

* {@link Message#sysHeaders()}, other header fields as well.

*

* @param message a message will be sent

* @return the successful {@code SendResult}

* @throws OMSMessageFormatException if an invalid message is specified.

* @throws OMSTimeOutException if the given timeout elapses before the send operation completes

* @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.

*/

SendResult send(Message message);

/**

* Sends a message to the specified destination synchronously, using the specified attributes, the destination

* should be preset to {@link Message#sysHeaders()}, other header fields as well.

*

* @param message a message will be sent

* @param attributes the specified attributes

* @return the successful {@code SendResult}

* @throws OMSMessageFormatException if an invalid message is specified.

* @throws OMSTimeOutException if the given timeout elapses before the send operation completes

* @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.

*/

SendResult send(Message message, KeyValue attributes);

/**

* Sends a transactional message to the specified destination synchronously, using the specified attributes,

* the destination should be preset to {@link Message#sysHeaders()}, other header fields as well.

* <p>

* A transactional message will be exposed to consumer if and only if the local transaction

* branch has been committed, or be discarded if local transaction has been rolled back.

*

* @param message a transactional message will be sent

* @param branchExecutor local transaction executor associated with the message

* @param attributes the specified attributes

* @return the successful {@code SendResult}

* @throws OMSMessageFormatException if an invalid message is specified.

* @throws OMSTimeOutException if the given timeout elapses before the send operation completes

* @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.

*/

SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes);

/**

* Sends a message to the specified destination asynchronously, the destination should be preset to

* {@link Message#sysHeaders()}, other header fields as well.

* <p>

* The returned {@code Promise} will have the result once the operation completes, and the registered

* {@code FutureListener} will be notified, either because the operation was successful or because of an error.

*

* @param message a message will be sent

* @return the {@code Promise} of an asynchronous message send operation.

* @see Future

* @see FutureListener

*/

Future<SendResult> sendAsync(Message message);

/**

* Sends a message to the specified destination asynchronously, using the specified attributes, the destination

* should be preset to {@link Message#sysHeaders()}, other header fields as well.

* <p>

* The returned {@code Promise} will have the result once the operation completes, and the registered

* {@code FutureListener} will be notified, either because the operation was successful or because of an error.

*

* @param message a message will be sent

* @param attributes the specified attributes

* @return the {@code Promise} of an asynchronous message send operation.

* @see Future

* @see FutureListener

*/

Future<SendResult> sendAsync(Message message, KeyValue attributes);

/**

* Sends a message to the specified destination in one way, the destination should be preset to

* {@link Message.BuiltinKeys}, other header fields as well.

* <p>

* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't

* care about the send result and also have no context to get the result.

*

* @param message a message will be sent

*/

void sendOneway(Message message);

/**

* Sends a message to the specified destination in one way, using the specified attributes, the destination

* should be preset to {@link Message.BuiltinKeys}, other header fields as well.

* <p>

* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't

* care about the send result and also have no context to get the result.

*

* @param message a message will be sent

* @param properties the specified userHeaders

*/

void sendOneway(Message message, KeyValue properties);

/**

* Creates a {@code BatchMessageSender} to send message in batch manner.

*

* @return a {@code BatchMessageSender} instance

*/

BatchMessageSender createBatchMessageSender();

/**

* Adds a {@code ProducerInterceptor} to intercept send operations of producer.

*

* @param interceptor a producer interceptor

*/

void addInterceptor(ProducerInterceptor interceptor);

/**

* Removes a {@code ProducerInterceptor}

*

* @param interceptor a producer interceptor will be removed

*/

void removeInterceptor(ProducerInterceptor interceptor);

}

  • 这里提供了诸多发送消息的方法,也提供创建BatchMessageSender的方法
  • 消息这里封装为Message,设置的属性封装为KeyValue,发送结果封装为SendResult

Message

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/Message.java

/**

* The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is

* {@link BytesMessage}.

* <p>

* Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of

* header and body and is used by separate applications to exchange a piece of information,

* like <a href="http://rocketmq.apache.org/">Apache RocketMQ</a>.

* <p>

* The header contains fields used by the messaging system that describes the message's meta information,

* like QoS level, origin, destination, and so on, while the body contains the application data being transmitted.

* <p>

* As for the header, OMS defines two kinds types: System Header and User Header,

* with respect to flexibility in vendor implementation and user usage.

* <ul>

* <li>

* System Header, OMS defines some standard attributes that represent the characteristics of the message.

* </li>

* <li>

* User Header, some OMS vendors may require enhanced extra attributes of the message

* or some users may want to clarify some customized attributes to draw the body.

* OMS provides the improved scalability for these scenarios.

* </li>

* </ul>

* The body contains the application data being transmitted,

* which is generally ignored by the messaging system and simply transmitted to its destination.

* <p>

* In BytesMessage, the body is just a byte array, may be compressed and uncompressed

* in the transmitting process by the messaging system.

* The application is responsible for explaining the concrete content and format of the message body,

* OMS is never aware of that.

*

* The body part is placed in the implementation classes of {@code Message}.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface Message {

/**

* Returns all the system header fields of the {@code Message} object as a {@code KeyValue}.

*

* @return the system headers of a {@code Message}

* @see BuiltinKeys

*/

KeyValue sysHeaders();

/**

* Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}.

*

* @return the user headers of a {@code Message}

*/

KeyValue userHeaders();

/**

* Puts a {@code String}-{@code int} {@code KeyValue} entry to the system headers of a {@code Message}.

*

* @param key the key to be placed into the system headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putSysHeaders(String key, int value);

/**

* Puts a {@code String}-{@code long} {@code KeyValue} entry to the system headers of a {@code Message}.

*

* @param key the key to be placed into the system headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putSysHeaders(String key, long value);

/**

* Puts a {@code String}-{@code double} {@code KeyValue} entry to the system headers of a {@code Message}.

*

* @param key the key to be placed into the system headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putSysHeaders(String key, double value);

/**

* Puts a {@code String}-{@code String} {@code KeyValue} entry to the system headers of a {@code Message}.

*

* @param key the key to be placed into the system headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putSysHeaders(String key, String value);

/**

* Puts a {@code String}-{@code int} {@code KeyValue} entry to the user headers of a {@code Message}.

*

* @param key the key to be placed into the user headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putUserHeaders(String key, int value);

/**

* Puts a {@code String}-{@code long} {@code KeyValue} entry to the user headers of a {@code Message}.

*

* @param key the key to be placed into the user headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putUserHeaders(String key, long value);

/**

* Puts a {@code String}-{@code double} {@code KeyValue} entry to the user headers of a {@code Message}.

*

* @param key the key to be placed into the user headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putUserHeaders(String key, double value);

/**

* Puts a {@code String}-{@code String} {@code KeyValue} entry to the user headers of a {@code Message}.

*

* @param key the key to be placed into the user headers

* @param value the value corresponding to <tt>key</tt>

*/

Message putUserHeaders(String key, String value);

/**

* Get message body

*

* @param type Message body type

* @param <T> Generic type

* @return message body

* @throws OMSMessageFormatException if the message body cannot be assigned to the specified type

*/

<T> T getBody(Class<T> type) throws OMSMessageFormatException;

interface BuiltinKeys {

/**

* The {@code MESSAGE_ID} header field contains a value that uniquely identifies

* each message sent by a {@code Producer}.

* <p>

* When a message is sent, MESSAGE_ID is assigned by the producer.

*/

String MESSAGE_ID = "MESSAGE_ID";

/**

* The {@code DESTINATION} header field contains the destination to which the message is being sent.

* <p>

* When a message is sent this value is set to the right {@code Queue}, then the message will be sent to

* the specified destination.

* <p>

* When a message is received, its destination is equivalent to the {@code Queue} where the message resides in.

*/

String DESTINATION = "DESTINATION";

//......

}

}

  • Message接口主要定义了设置header以及获取body的方法
  • 另外还内置了BuiltinKeys的header,比如MESSAGE_ID,DESTINATION等

KeyValue

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java

/**

* The {@code KeyValue} class represents a persistent set of attributes,

* which supports method chaining.

* <p>

* A {@code KeyValue} object only allows {@code String} keys and can contain four primitive type

* as values: {@code int}, {@code long}, {@code double}, {@code String}.

* <p>

* The {@code KeyValue} is a replacement of {@code Properties}, with simpler

* interfaces and reasonable entry limits.

* <p>

* A {@code KeyValue} object may be used in concurrent scenarios, so the implementation

* of {@code KeyValue} should consider concurrent related issues.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface KeyValue {

/**

* Inserts or replaces {@code int} value for the specified key.

*

* @param key the key to be placed into this {@code KeyValue} object

* @param value the value corresponding to <tt>key</tt>

*/

KeyValue put(String key, int value);

/**

* Inserts or replaces {@code long} value for the specified key.

*

* @param key the key to be placed into this {@code KeyValue} object

* @param value the value corresponding to <tt>key</tt>

*/

KeyValue put(String key, long value);

/**

* Inserts or replaces {@code double} value for the specified key.

*

* @param key the key to be placed into this {@code KeyValue} object

* @param value the value corresponding to <tt>key</tt>

*/

KeyValue put(String key, double value);

/**

* Inserts or replaces {@code String} value for the specified key.

*

* @param key the key to be placed into this {@code KeyValue} object

* @param value the value corresponding to <tt>key</tt>

*/

KeyValue put(String key, String value);

/**

* Searches for the {@code int} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, zero is returned.

*

* @param key the property key

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, int)

*/

int getInt(String key);

/**

* Searches for the {@code int} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, the default value argument is returned.

*

* @param key the property key

* @param defaultValue a default value

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, int)

*/

int getInt(String key, int defaultValue);

/**

* Searches for the {@code long} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, zero is returned.

*

* @param key the property key

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, long)

*/

long getLong(String key);

/**

* Searches for the {@code long} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, the default value argument is returned.

*

* @param key the property key

* @param defaultValue a default value

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, long)

*/

long getLong(String key, long defaultValue);

/**

* Searches for the {@code double} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, zero is returned.

*

* @param key the property key

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, double)

*/

double getDouble(String key);

/**

* Searches for the {@code double} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, the default value argument is returned.

*

* @param key the property key

* @param defaultValue a default value

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, double)

*/

double getDouble(String key, double defaultValue);

/**

* Searches for the {@code String} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, {@code null} is returned.

*

* @param key the property key

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, String)

*/

String getString(String key);

/**

* Searches for the {@code String} property with the specified key in this {@code KeyValue} object.

* If the key is not found in this property list, the default value argument is returned.

*

* @param key the property key

* @param defaultValue a default value

* @return the value in this {@code KeyValue} object with the specified key value

* @see #put(String, String)

*/

String getString(String key, String defaultValue);

/**

* Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.

* <p>

* The set is backed by the {@code KeyValue}, so changes to the set are

* reflected in the @code KeyValue}, and vice-versa.

*

* @return the key set view of this {@code KeyValue} object.

*/

Set<String> keySet();

/**

* Tests if the specified {@code String} is a key in this {@code KeyValue}.

*

* @param key possible key

* @return <code>true</code> if and only if the specified key is in this {@code KeyValue}, <code>false</code>

* otherwise.

*/

boolean containsKey(String key);

}

  • 主要封装int、long、double、Sting这几种类型,其中key只能为String类型

SendResult

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/SendResult.java

/**

* The result of sending a OMS message to server

* with the message id and some attributes.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface SendResult {

/**

* The unique message id related to the {@code SendResult} instance.

*

* @return the message id

*/

String messageId();

}

  • 目前仅仅定义一个返回messageId的方法

BatchMessageSender

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/BatchMessageSender.java

/**

* A message sender created through {@link Producer#createBatchMessageSender()}, to send

* messages in batch manner, and commit or roll back at the appropriate time.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface BatchMessageSender {

/**

* Submits a message to this sender

*

* @param message a message to be sent

* @return this batch sender

*/

BatchMessageSender send(Message message);

/**

* Commits all the uncommitted messages in this sender.

*

* @throws OMSRuntimeException if the sender fails to commit the messages due to some internal error.

*/

void commit();

/**

* Discards all the uncommitted messages in this sender.

*/

void rollback();

/**

* Close this sender.

*/

void close();

}

  • 定义了send方法用于批量添加消息,这里的send语义不是太好,用pending好一些
  • 然后定义了commit用于提交批量消息、rollback用于回滚、close用于关闭这个sender

consumer

PullConsumer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java

/**

* A {@code PullConsumer} pulls messages from the specified queue,

* and supports submit the consume result by acknowledgement.

*

* @version OMS 1.0.0

* @see MessagingAccessPoint#createPullConsumer()

* @since OMS 1.0.0

*/

public interface PullConsumer extends ServiceLifecycle {

/**

* Returns the attributes of this {@code PullConsumer} instance.

* Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer}.

* <p>

* There are some standard attributes defined by OMS for {@code PullConsumer}:

* <ul>

* <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.

* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PullConsumer}.

* </ul>

*

* @return the attributes

*/

KeyValue attributes();

/**

* Attaches the {@code PullConsumer} to a specified queue.

*

* @param queueName a specified queue

* @return this {@code PullConsumer} instance

*/

PullConsumer attachQueue(String queueName);

/**

* Attaches the {@code PullConsumer} to a specified queue with some specified attributes..

*

* @param queueName a specified queue

* @param attributes some specified attributes

* @return this {@code PullConsumer} instance

*/

PullConsumer attachQueue(String queueName, KeyValue attributes);

/**

* Detaches the {@code PullConsumer} from a specified queue.

* <p>

* After the success call, this consumer won't receive new message

* from the specified queue any more.

*

* @param queueName a specified queue

* @return this {@code PullConsumer} instance

*/

PullConsumer detachQueue(String queueName);

/**

* Receives the next message from the attached queues of this consumer.

* <p>

* This call blocks indefinitely until a message is arrives, the timeout expires,

* or until this {@code PullConsumer} is shut down.

*

* @return the next message received from the attached queues, or null if the consumer is

* concurrently shut down or the timeout expires

* @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error.

*/

Message receive();

/**

* Receives the next message from the attached queues of this consumer, using the specified attributes.

* <p>

* This call blocks indefinitely until a message is arrives, the timeout expires,

* or until this {@code PullConsumer} is shut down.

*

* @param attributes the specified attributes

* @return the next message received from the attached queues, or null if the consumer is

* concurrently shut down or the timeout expires

* @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error.

*/

Message receive(KeyValue attributes);

/**

* Acknowledges the specified and consumed message with the unique message receipt handle.

* <p>

* Messages that have been received but not acknowledged may be redelivered.

*

* @param receiptHandle the receipt handle associated with the consumed message

* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.

*/

void ack(String receiptHandle);

/**

* Acknowledges the specified and consumed message with the specified attributes.

* <p>

* Messages that have been received but not acknowledged may be redelivered.

*

* @param receiptHandle the receipt handle associated with the consumed message

* @param attributes the specified attributes

* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.

*/

void ack(String receiptHandle, KeyValue attributes);

}

  • 定义了attachQueue以及attachQueue方法,用于绑定及解绑队列
  • receive方法用于接收消息,其实这里用receive语义不是太好,用pull可能好点
  • ack方法用于ack消息

PushConsumer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java

/**

* A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from

* MOM server to {@code PushConsumer} client.

*

* @version OMS 1.0.0

* @see MessagingAccessPoint#createPushConsumer()

* @since OMS 1.0.0

*/

public interface PushConsumer extends ServiceLifecycle {

/**

* Returns the attributes of this {@code PushConsumer} instance.

* Changes to the return {@code KeyValue} are not reflected in physical {@code PushConsumer}.

* <p>

* There are some standard attributes defined by OMS for {@code PushConsumer}:

* <ul>

* <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.

* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PushConsumer}.

* </ul>

*

* @return the attributes

*/

KeyValue attributes();

/**

* Resumes the {@code PushConsumer} after a suspend.

* <p>

* This method resumes the {@code PushConsumer} instance after it was suspended.

* The instance will not receive new messages between the suspend and resume calls.

*

* @throws OMSRuntimeException if the instance has not been suspended.

* @see PushConsumer#suspend()

*/

void resume();

/**

* Suspends the {@code PushConsumer} for later resumption.

* <p>

* This method suspends the consumer until it is resumed.

* The consumer will not receive new messages between the suspend and resume calls.

* <p>

* This method behaves exactly as if it simply performs the call {@code suspend(0)}.

*

* @throws OMSRuntimeException if the instance is not currently running.

* @see PushConsumer#resume()

*/

void suspend();

/**

* Suspends the {@code PushConsumer} for later resumption.

* <p>

* This method suspends the consumer until it is resumed or a

* specified amount of time has elapsed.

* The consumer will not receive new messages during the suspended state.

* <p>

* This method is similar to the {@link #suspend()} method, but it allows finer control

* over the amount of time to suspend, and the consumer will be suspended until it is resumed

* if the timeout is zero.

*

* @param timeout the maximum time to suspend in milliseconds.

* @throws OMSRuntimeException if the instance is not currently running.

*/

void suspend(long timeout);

/**

* This method is used to find out whether the {@code PushConsumer} is suspended.

*

* @return true if this {@code PushConsumer} is suspended, false otherwise

*/

boolean isSuspended();

/**

* Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener}.

* <p>

* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new

* delivered message is coming.

*

* @param queueName a specified queue

* @param listener a specified listener to receive new message

* @return this {@code PushConsumer} instance

*/

PushConsumer attachQueue(String queueName, MessageListener listener);

/**

* Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener} and some

* specified attributes.

* <p>

* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new

* delivered message is coming.

*

* @param queueName a specified queue

* @param listener a specified listener to receive new message

* @param attributes some specified attributes

* @return this {@code PushConsumer} instance

*/

PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes);

/**

* Detaches the {@code PushConsumer} from a specified queue.

* <p>

* After the success call, this consumer won't receive new message

* from the specified queue any more.

*

* @param queueName a specified queue

* @return this {@code PushConsumer} instance

*/

PushConsumer detachQueue(String queueName);

/**

* Adds a {@code ConsumerInterceptor} instance to this consumer.

*

* @param interceptor an interceptor instance

*/

void addInterceptor(ConsumerInterceptor interceptor);

/**

* Removes an interceptor from this consumer.

*

* @param interceptor an interceptor to be removed

*/

void removeInterceptor(ConsumerInterceptor interceptor);

}

  • 定义了attachQueue以及detachQueue方法用于绑定及解绑队列
  • attachQueue方法有个MessageListener用于消息push的时候的回调处理

MessageListener

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java

/**

* A message listener must implement this {@code MessageListener} interface and register

* itself to a consumer instance to asynchronously receive messages.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface MessageListener {

/**

* Callback method to receive incoming messages.

* <p>

* A message listener should handle different types of {@code Message}.

*

* @param message the received message object

* @param context the context delivered to the consume thread

*/

void onReceived(Message message, Context context);

interface Context {

/**

* Returns the attributes of this {@code MessageContext} instance.

*

* @return the attributes

*/

KeyValue attributes();

/**

* Acknowledges the specified and consumed message, which is related to this {@code MessageContext}.

* <p>

* Messages that have been received but not acknowledged may be redelivered.

*

* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.

*/

void ack();

}

}

  • MessageListener定义了onReceived的回调方法,同时传递Context上下文对象

StreamingConsumer

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingConsumer.java

/**

* A {@code StreamingConsumer} provides low level APIs to open multiple streams

* from a specified queue and then retrieve messages from them through @{code StreamingIterator}.

*

* A {@code Queue} is consists of multiple streams, the {@code Stream} is an abstract concept and

* can be associated with partition in most messaging systems.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface StreamingConsumer extends ServiceLifecycle {

/**

* Returns the attributes of this {@code StreamingConsumer} instance.

* Changes to the return {@code KeyValue} are not reflected in physical {@code StreamingConsumer}.

* <p>

* There are some standard attributes defined by OMS for {@code StreamingConsumer}:

* <ul>

* <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.

* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code

* StreamingConsumer}.

* </ul>

*

* @return the attributes

*/

KeyValue attributes();

/**

* Creates a {@code StreamingIterator} from the end position of the specified stream.

*

* @param streamName the specified stream

* @return a message iterator at the begin position.

*/

StreamingIterator seekToEnd(String streamName);

/**

* Creates a {@code StreamingIterator} from the begin position of the specified stream.

*

* @param streamName the specified stream

* @return a message iterator at the begin position.

*/

StreamingIterator seekToBeginning(String streamName);

/**

* Creates a {@code StreamingIterator} from the fixed position of the specified stream.

* <p>

* Creates a {@code StreamingIterator} from the begin position if the given position

* is earlier than the first message's store position in this stream.

* <p>

* Creates a {@code StreamingIterator} from the end position, if the given position

* is later than the last message's store position in this stream.

* <p>

* The position is a {@code String} value, may represented by timestamp, offset, cursor,

* even a casual key.

*

* @param streamName the specified stream

* @param position the specified position

* @return a message iterator at the specified position

*/

StreamingIterator seek(String streamName, String position);

}

  • 流式处理主要是定义seek方法,返回的是StreamingIterator

StreamingIterator

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingIterator.java

/**

* A {@code StreamingIterator} is provided by {@code Stream} and is used to

* retrieve messages a specified stream like a read-only iterator.

*

* @version OMS 1.0.0

* @since OMS 1.0.0

*/

public interface StreamingIterator {

/**

* Returns the attributes of this {@code StreamingIterator} instance.

* <p>

* There are some standard attributes defined by OMS for {@code Stream}:

* <ul>

* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code

* Stream}.

* </ul>

*

* @return the attributes

*/

KeyValue attributes();

/**

* Returns {@code true} if this iterator has more messages when

* traversing the iterator in the forward direction.

*

* @return {@code true} if the iterator has more messages

*/

boolean hasNext();

/**

* Returns the next message in the iteration and advances the offset position.

* <p>

* This method may be called repeatedly to iterate through the iteration,

* or intermixed with calls to {@link #previous} to go back and forth.

*

* @return the next message in the list

* @throws OMSRuntimeException if the iteration has no more message, or

* the the consumer fails to receive the next message

*/

Message next();

/**

* Returns {@code true} if this partition iterator has more messages when

* traversing the iterator in the reverse direction.

*

* @return {@code true} if the partition iterator has more messages when

* traversing the iterator in the reverse direction

*/

boolean hasPrevious();

/**

* Returns the previous message in the iteration and moves the offset

* position backwards.

* <p>

* This method may be called repeatedly to iterate through the iteration backwards,

* or intermixed with calls to {@link #next} to go back and forth.

*

* @return the previous message in the list

* @throws OMSRuntimeException if the iteration has no previous message, or

* the the consumer fails to receive the previous message

*/

Message previous();

/**

* Returns the position of the message that would be returned by a

* subsequent call to {@link #next}.

*

* @return the position of the next message

* @throws OMSRuntimeException if the iteration has no next message

*/

String nextPosition();

/**

* Returns the position of the message that would be returned by a

* subsequent call to {@link #previous}.

*

* @return the position of the previous message

* @throws OMSRuntimeException if the iteration has no previous message

*/

String previousPosition();

}

  • 是一个read-only的iterator,类似java的iterator,提供了hasNext、next等方法

小结

openmessaging-java的定义java实现OpenMessaging的api规范,其中producer提供了单个发送也提供了批量发送的方法,而consumer则提供了pull、push以及stream三类消费方式。

doc

  • openmessaging.cloud
  • OpenMessaging Runtime Interface for Java

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表