包含标签 kafka 的文章

Polygon在websoket的连接串数据的经常中断

本来使用工具或者服务 Pologon 美股数据供应商 Kafka Nettty Centos8.2 架构图 架构说明 Polygon是美股的数据供应商,提供美股tick级别的数据服务 美股开盘数据非常的大,平时可以达到6M/秒的传输速度,也就是说可以达到每秒6万条数据的传输速度 数据的传输是基于internet的传输,所以要保障数据在互联网上的快速传输必须基于长连接来实现,因此polygon提供了websocket服务 我们的app收到polygon websocket推送过来的数据要进行简单的解析和格式转换,然后发送到本地的kafka cluster中间件用来分发数据,给其他的应用来做大数据处理包括flink等大数据中间件 出现的问题 Polygon数据传输出现了频繁的中断1:Slow consumer slow consumer:https://polygon.io/docs/stocks/ws_getting-started 查看了一下polygon的技術文檔: if a client is consuming messages too slowly for too long, Polygon.io’s server-side buffer may get too large. If that happens, Polygon.io will terminate the WebSocket connection. You can check your account dashboard to see if a connection was terminated as a slow consumer. If this happens to you consistently, consider subscribing to fewer symbols or channels.……

阅读全文

kafka cluster的搭建

本文使用工具或者服务 Java 8 https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz Centos8.2 Zookeeper 簡介 ZooKeeper是一个分布式协调服务,它的主要作用是为分布式系统提供一致性服务,提供的功能包括:配置维护、命名服务、分布式同步、组服务等。Kafka的运行依赖ZooKeeper。 Broker注册 Topic注册 生产者负载均衡 消费者负载均衡 消费者注册 Kafka 簡介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 Kafka 是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。 高吞吐量 即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。 支持通过Kafka服务器和消费机集群来分区消息。 zookeeper 與kafka之間的關係 ZooKeeper用于分布式系统的协调,Kafka使用ZooKeeper也是基于相同的原因。ZooKeeper主要用来协调Kafka的各个broker,不仅可以实现broker的负载均衡,而且当增加了broker或者某个broker故障了,ZooKeeper将会通知生产者和消费者,这样可以保证整个系统正常运转。 安裝zooker和kafka 之前首先要做系統的優化和配置 文件句柄數的調整 oracle Jre8 安裝 磁盤的分區和挂載點必須滿足kafka大數量的存儲xfs分區 關閉selinux 防火墻暫時關閉(後面看可以統一規劃端口) 創建非root的kafka管理用戶並添加sudoer權限 注意事項 kafka是一個分佈式的消息系統,我們會臨時存儲消息,如果存儲消息的時間長,那麽需要大空間,所以分區的時候已經要注意,就是安裝kafka的時候存儲一定要挂載到大的硬盤空間上 Zookeeper 配置 # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored.……

阅读全文